Skip to content

Commit a190ba7

Browse files
committed
Add Missing Pause/Resume Events
Resolves #2387 - `FallbackBatchErrorHandler` - `Acknowledgment.nack()` **2.9.x only - I will back port/forward port**
1 parent e1bb194 commit a190ba7

File tree

9 files changed

+109
-19
lines changed

9 files changed

+109
-19
lines changed

spring-kafka/src/main/java/org/springframework/kafka/event/ConsumerPausedEvent.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,8 @@ public class ConsumerPausedEvent extends KafkaEvent {
3333

3434
private final Collection<TopicPartition> partitions;
3535

36+
private final String reason;
37+
3638
/**
3739
* Construct an instance with the provided source and partitions.
3840
* @param source the container instance that generated the event.
@@ -43,6 +45,23 @@ public class ConsumerPausedEvent extends KafkaEvent {
4345
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions) {
4446
super(source, container);
4547
this.partitions = partitions;
48+
this.reason = null;
49+
}
50+
51+
/**
52+
* Construct an instance with the provided source and partitions.
53+
* @param source the container instance that generated the event.
54+
* @param container the container or the parent container if the container is a child.
55+
* @param partitions the partitions.
56+
* @param reason the reason for the pause.
57+
* @since 2.8.9
58+
*/
59+
public ConsumerPausedEvent(Object source, Object container, Collection<TopicPartition> partitions,
60+
String reason) {
61+
62+
super(source, container);
63+
this.partitions = partitions;
64+
this.reason = reason;
4665
}
4766

4867
/**
@@ -55,7 +74,7 @@ public Collection<TopicPartition> getPartitions() {
5574

5675
@Override
5776
public String toString() {
58-
return "ConsumerPausedEvent [partitions=" + this.partitions + "]";
77+
return "ConsumerPausedEvent [reason=" + this.reason + ", partitions=" + this.partitions + "]";
5978
}
6079

6180
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CommonErrorHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,8 +157,24 @@ default void setAckAfterHandle(boolean ack) {
157157
* @param consumer the consumer.
158158
* @param partitions the newly assigned partitions.
159159
* @since 2.8.8
160+
* @deprecated in favor of {@link #onPartitionsAssigned(Consumer, Collection, Runnable)}.
160161
*/
162+
@Deprecated
161163
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
162164
}
163165

166+
/**
167+
* Called when partitions are assigned.
168+
* @param consumer the consumer.
169+
* @param partitions the newly assigned partitions.
170+
* @param publishPause called to publish a consumer paused event.
171+
* @since 2.8.9
172+
*/
173+
@SuppressWarnings("deprecation")
174+
default void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
175+
Runnable publishPause) {
176+
177+
onPartitionsAssigned(consumer, partitions);
178+
}
179+
164180
}

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultErrorHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,10 @@ public void handleOtherException(Exception thrownException, Consumer<?, ?> consu
160160
}
161161

162162
@Override
163-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
164-
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions);
163+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
164+
Runnable publishPause) {
165+
166+
getFallbackBatchHandler().onPartitionsAssigned(consumer, partitions, publishPause);
165167
}
166168

167169
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlerAdapter.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,12 @@ public void handleBatch(Exception thrownException, ConsumerRecords<?, ?> data, C
161161

162162
@SuppressWarnings("deprecation")
163163
@Override
164-
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
164+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
165+
Runnable publishPause) {
166+
165167
if (this.batchErrorHandler instanceof RetryingBatchErrorHandler) {
166-
((RetryingBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions);
168+
((RetryingBatchErrorHandler) this.batchErrorHandler).onPartitionsAssigned(consumer, partitions,
169+
publishPause);
167170
}
168171
}
169172

spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package org.springframework.kafka.listener;
1818

1919
import java.time.Duration;
20+
import java.util.Set;
2021
import java.util.function.BiConsumer;
2122

2223
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
25+
import org.apache.kafka.common.TopicPartition;
2426

2527
import org.springframework.core.log.LogAccessor;
2628
import org.springframework.kafka.KafkaException;
@@ -62,7 +64,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
6264
BackOffExecution execution = backOff.start();
6365
long nextBackOff = execution.nextBackOff();
6466
String failed = null;
65-
consumer.pause(consumer.assignment());
67+
Set<TopicPartition> assignment = consumer.assignment();
68+
consumer.pause(assignment);
69+
if (container instanceof KafkaMessageListenerContainer) {
70+
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerPausedEvent(assignment, "For batch retry");
71+
}
6672
try {
6773
while (nextBackOff != BackOffExecution.STOP) {
6874
consumer.poll(Duration.ZERO);
@@ -99,7 +105,11 @@ public static void retryBatch(Exception thrownException, ConsumerRecords<?, ?> r
99105
}
100106
}
101107
finally {
102-
consumer.resume(consumer.assignment());
108+
Set<TopicPartition> assignment2 = consumer.assignment();
109+
consumer.resume(assignment2);
110+
if (container instanceof KafkaMessageListenerContainer) {
111+
((KafkaMessageListenerContainer<?, ?>) container).publishConsumerResumedEvent(assignment2);
112+
}
103113
}
104114
}
105115

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -442,15 +442,15 @@ private void publishNonResponsiveConsumerEvent(long timeSinceLastPoll, Consumer<
442442
}
443443
}
444444

445-
private void publishConsumerPausedEvent(Collection<TopicPartition> partitions) {
445+
void publishConsumerPausedEvent(Collection<TopicPartition> partitions, String reason) {
446446
ApplicationEventPublisher publisher = getApplicationEventPublisher();
447447
if (publisher != null) {
448448
publisher.publishEvent(new ConsumerPausedEvent(this, this.thisOrParentContainer,
449-
Collections.unmodifiableCollection(partitions)));
449+
Collections.unmodifiableCollection(partitions), reason));
450450
}
451451
}
452452

453-
private void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
453+
void publishConsumerResumedEvent(Collection<TopicPartition> partitions) {
454454
ApplicationEventPublisher publisher = getApplicationEventPublisher();
455455
if (publisher != null) {
456456
publisher.publishEvent(new ConsumerResumedEvent(this, this.thisOrParentContainer,
@@ -1633,7 +1633,9 @@ private void doPauseConsumerIfNecessary() {
16331633
this.consumer.pause(assigned);
16341634
this.consumerPaused = true;
16351635
this.logger.debug(() -> "Paused consumption from: " + this.consumer.paused());
1636-
publishConsumerPausedEvent(assigned);
1636+
publishConsumerPausedEvent(assigned, this.pausedForAsyncAcks
1637+
? "Incomplete out of order acks"
1638+
: "User requested");
16371639
}
16381640
}
16391641
}
@@ -1644,6 +1646,7 @@ private void resumeConsumerIfNeccessary() {
16441646
this.nackWakeTimeMillis = 0;
16451647
this.consumer.resume(this.pausedForNack);
16461648
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
1649+
publishConsumerResumedEvent(this.pausedForNack);
16471650
this.pausedForNack.clear();
16481651
}
16491652
}
@@ -2539,6 +2542,7 @@ private void pauseForNackSleep() {
25392542
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
25402543
try {
25412544
this.consumer.pause(this.pausedForNack);
2545+
publishConsumerPausedEvent(this.pausedForNack, "Nack with sleep time received");
25422546
}
25432547
catch (IllegalStateException ex) {
25442548
// this should never happen; defensive, just in case...
@@ -3375,7 +3379,8 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
33753379
}
33763380
if (ListenerConsumer.this.commonErrorHandler != null) {
33773381
ListenerConsumer.this.commonErrorHandler.onPartitionsAssigned(ListenerConsumer.this.consumer,
3378-
partitions);
3382+
partitions, () -> publishConsumerPausedEvent(partitions,
3383+
"Paused by error handler after rebalance"));
33793384
}
33803385
}
33813386

@@ -3386,7 +3391,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
33863391
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
33873392
+ "consumer paused again, so the initial poll() will never return any records");
33883393
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + partitions);
3389-
publishConsumerPausedEvent(partitions);
3394+
publishConsumerPausedEvent(partitions, "Re-paused after rebalance");
33903395
}
33913396
Collection<TopicPartition> toRepause = new LinkedList<>();
33923397
partitions.forEach(tp -> {
@@ -3397,7 +3402,7 @@ private void repauseIfNeeded(Collection<TopicPartition> partitions) {
33973402
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
33983403
ListenerConsumer.this.consumer.pause(toRepause);
33993404
ListenerConsumer.this.logger.debug(() -> "Paused consumption from: " + toRepause);
3400-
publishConsumerPausedEvent(toRepause);
3405+
publishConsumerPausedEvent(toRepause, "Re-paused after rebalance");
34013406
}
34023407
this.revoked.removeAll(toRepause);
34033408
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);

spring-kafka/src/main/java/org/springframework/kafka/listener/RetryingBatchErrorHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,20 @@ public void handle(Exception thrownException, @Nullable ConsumerRecords<?, ?> re
114114
}
115115
}
116116

117+
@Deprecated
117118
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
118119
if (this.retrying.get()) {
119120
consumer.pause(consumer.assignment());
120121
}
121122
}
122123

124+
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
125+
Runnable publishPause) {
126+
127+
if (this.retrying.get()) {
128+
consumer.pause(consumer.assignment());
129+
publishPause.run();
130+
}
131+
}
132+
123133
}

spring-kafka/src/test/java/org/springframework/kafka/listener/FallbackBatchErrorHandlerTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,25 +189,31 @@ void rePauseOnRebalance() {
189189
Collections.singletonList(new ConsumerRecord<>("foo", 1, 0L, "foo", "bar")));
190190
ConsumerRecords<?, ?> records = new ConsumerRecords<>(map);
191191
Consumer<?, ?> consumer = mock(Consumer.class);
192+
given(consumer.assignment()).willReturn(map.keySet());
193+
AtomicBoolean pubPauseCalled = new AtomicBoolean();
192194
willAnswer(inv -> {
193-
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
195+
eh.onPartitionsAssigned(consumer, List.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)),
196+
() -> pubPauseCalled.set(true));
194197
return records;
195198
}).given(consumer).poll(any());
196-
MessageListenerContainer container = mock(MessageListenerContainer.class);
199+
KafkaMessageListenerContainer<?, ?> container = mock(KafkaMessageListenerContainer.class);
197200
given(container.isRunning()).willReturn(true);
198201
eh.handle(new RuntimeException(), records, consumer, container, () -> {
199202
this.invoked++;
200203
throw new RuntimeException();
201204
});
202205
assertThat(this.invoked).isEqualTo(1);
203206
assertThat(recovered).hasSize(2);
204-
InOrder inOrder = inOrder(consumer);
207+
InOrder inOrder = inOrder(consumer, container);
205208
inOrder.verify(consumer).pause(any());
209+
inOrder.verify(container).publishConsumerPausedEvent(map.keySet(), "For batch retry");
206210
inOrder.verify(consumer).poll(any());
207211
inOrder.verify(consumer).pause(any());
208212
inOrder.verify(consumer).resume(any());
213+
inOrder.verify(container).publishConsumerResumedEvent(map.keySet());
209214
verify(consumer, times(3)).assignment();
210215
verifyNoMoreInteractions(consumer);
216+
assertThat(pubPauseCalled.get()).isTrue();
211217
}
212218

213219
@Test

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackPauseResumeTests.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,11 +51,14 @@
5151
import org.springframework.beans.factory.annotation.Autowired;
5252
import org.springframework.context.annotation.Bean;
5353
import org.springframework.context.annotation.Configuration;
54+
import org.springframework.context.event.EventListener;
5455
import org.springframework.kafka.annotation.EnableKafka;
5556
import org.springframework.kafka.annotation.KafkaListener;
5657
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
5758
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
5859
import org.springframework.kafka.core.ConsumerFactory;
60+
import org.springframework.kafka.event.ConsumerPausedEvent;
61+
import org.springframework.kafka.event.ConsumerResumedEvent;
5962
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6063
import org.springframework.kafka.support.Acknowledgment;
6164
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -93,6 +96,8 @@ public void dontResumeAlreadyPaused() throws Exception {
9396
assertThat(this.config.resumedForNack).hasSize(1);
9497
assertThat(this.config.pausedForNack).contains(new TopicPartition("foo", 1));
9598
assertThat(this.config.resumedForNack).contains(new TopicPartition("foo", 1));
99+
assertThat(this.config.pauseEvents).hasSize(1);
100+
assertThat(this.config.resumeEvents).hasSize(1);
96101
}
97102

98103
@Configuration
@@ -113,6 +118,10 @@ public static class Config {
113118

114119
final Set<TopicPartition> resumedForNack = new HashSet<>();
115120

121+
final List<ConsumerPausedEvent> pauseEvents = new ArrayList<>();
122+
123+
final List<ConsumerResumedEvent> resumeEvents = new ArrayList<>();
124+
116125
volatile int count;
117126

118127
volatile long replayTime;
@@ -232,6 +241,16 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
232241
return factory;
233242
}
234243

244+
@EventListener
245+
public void paused(ConsumerPausedEvent event) {
246+
this.pauseEvents.add(event);
247+
}
248+
249+
@EventListener
250+
public void resumed(ConsumerResumedEvent event) {
251+
this.resumeEvents.add(event);
252+
}
253+
235254
}
236255

237256
}

0 commit comments

Comments
 (0)