Skip to content

Commit be4b1bf

Browse files
garyrussellartembilan
authored andcommitted
GH-2128 Do Not Sleep Consumer Thread for Nack
Resolves #2128 Suspending polling delays rebalancing; instead pause the consumer and continue polling. Check if partitions are already paused and only pause the current active partitions and resume them after the sleep interval has passed. Re-pause as necessary after a rebalance. Also tested with reporter's reproducer. **cherry-pick to 2.8.x**
1 parent 220262d commit be4b1bf

File tree

6 files changed

+363
-34
lines changed

6 files changed

+363
-34
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
712712

713713
private final Header infoHeader = new RecordHeader(KafkaHeaders.LISTENER_INFO, this.listenerinfo);
714714

715+
private final Set<TopicPartition> pausedForNack = new HashSet<>();
716+
715717
private Map<TopicPartition, OffsetMetadata> definedPartitions;
716718

717719
private int count;
@@ -728,6 +730,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
728730

729731
private long nackSleep = -1;
730732

733+
private long nackWake;
734+
731735
private int nackIndex;
732736

733737
private Iterator<TopicPartition> batchIterator;
@@ -1597,6 +1601,10 @@ private void pauseConsumerIfNecessary() {
15971601
}
15981602

15991603
private void doPauseConsumerIfNecessary() {
1604+
if (this.pausedForNack.size() > 0) {
1605+
this.logger.debug("Still paused for nack sleep");
1606+
return;
1607+
}
16001608
if (this.offsetsInThisBatch != null && this.offsetsInThisBatch.size() > 0 && !this.pausedForAsyncAcks) {
16011609
this.pausedForAsyncAcks = true;
16021610
this.logger.debug(() -> "Pausing for incomplete async acks: " + this.offsetsInThisBatch);
@@ -1610,7 +1618,15 @@ private void doPauseConsumerIfNecessary() {
16101618
}
16111619

16121620
private void resumeConsumerIfNeccessary() {
1613-
if (this.offsetsInThisBatch != null) {
1621+
if (this.nackWake > 0) {
1622+
if (System.currentTimeMillis() > this.nackWake) {
1623+
this.nackWake = 0;
1624+
this.consumer.resume(this.pausedForNack);
1625+
this.logger.debug(() -> "Resumed after nack sleep: " + this.pausedForNack);
1626+
this.pausedForNack.clear();
1627+
}
1628+
}
1629+
else if (this.offsetsInThisBatch != null) {
16141630
synchronized (this) {
16151631
doResumeConsumerIfNeccessary();
16161632
}
@@ -1654,12 +1670,10 @@ private void pausePartitionsIfNecessary() {
16541670
}
16551671

16561672
private void resumePartitionsIfNecessary() {
1657-
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
1658-
List<TopicPartition> partitionsToResume = this
1659-
.assignedPartitions
1673+
List<TopicPartition> partitionsToResume = getAssignedPartitions()
16601674
.stream()
16611675
.filter(tp -> !isPartitionPauseRequested(tp)
1662-
&& pausedConsumerPartitions.contains(tp))
1676+
&& this.pausedPartitions.contains(tp))
16631677
.collect(Collectors.toList());
16641678
if (partitionsToResume.size() > 0) {
16651679
this.consumer.resume(partitionsToResume);
@@ -2206,7 +2220,7 @@ private void invokeBatchOnMessage(final ConsumerRecords<K, V> records, // NOSONA
22062220
processCommits();
22072221
}
22082222
SeekUtils.doSeeks(toSeek, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
2209-
nackSleepAndReset();
2223+
pauseForNackSleep();
22102224
}
22112225
}
22122226

@@ -2467,17 +2481,29 @@ private void handleNack(final ConsumerRecords<K, V> records, final ConsumerRecor
24672481
}
24682482
}
24692483
SeekUtils.doSeeks(list, this.consumer, null, true, (rec, ex) -> false, this.logger); // NOSONAR
2470-
nackSleepAndReset();
2484+
pauseForNackSleep();
24712485
}
24722486

2473-
private void nackSleepAndReset() {
2474-
try {
2475-
ListenerUtils.stoppableSleep(KafkaMessageListenerContainer.this.thisOrParentContainer, this.nackSleep);
2476-
}
2477-
catch (@SuppressWarnings(UNUSED) InterruptedException e) {
2478-
Thread.currentThread().interrupt();
2487+
private void pauseForNackSleep() {
2488+
if (this.nackSleep > 0) {
2489+
this.nackWake = System.currentTimeMillis() + this.nackSleep;
2490+
this.nackSleep = -1;
2491+
Set<TopicPartition> alreadyPaused = this.consumer.paused();
2492+
this.pausedForNack.addAll(getAssignedPartitions());
2493+
this.pausedForNack.removeAll(alreadyPaused);
2494+
this.logger.debug(() -> "Pausing for nack sleep: " + ListenerConsumer.this.pausedForNack);
2495+
try {
2496+
this.consumer.pause(this.pausedForNack);
2497+
}
2498+
catch (IllegalStateException ex) {
2499+
// this should never happen; defensive, just in case...
2500+
this.logger.warn(() -> "Could not pause for nack, possible rebalance in process: "
2501+
+ ex.getMessage());
2502+
Set<TopicPartition> nowPaused = new HashSet<>(this.consumer.paused());
2503+
nowPaused.removeAll(alreadyPaused);
2504+
this.consumer.resume(nowPaused);
2505+
}
24792506
}
2480-
this.nackSleep = -1;
24812507
}
24822508

24832509
/**
@@ -3251,6 +3277,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
32513277
if (ListenerConsumer.this.assignedPartitions != null) {
32523278
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
32533279
}
3280+
ListenerConsumer.this.pausedForNack.removeAll(partitions);
32543281
partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp));
32553282
synchronized (ListenerConsumer.this) {
32563283
if (ListenerConsumer.this.offsetsInThisBatch != null) {
@@ -3275,6 +3302,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
32753302
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
32763303
+ "consumer paused again, so the initial poll() will never return any records");
32773304
}
3305+
if (ListenerConsumer.this.pausedForNack.size() > 0) {
3306+
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
3307+
}
32783308
ListenerConsumer.this.assignedPartitions.addAll(partitions);
32793309
if (ListenerConsumer.this.commitCurrentOnAssignment
32803310
&& !collectAndCommitIfNecessary(partitions)) {

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTests.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -197,7 +197,12 @@ public Consumer consumer() {
197197
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
198198
new RecordHeaders(), Optional.empty())));
199199
final AtomicInteger which = new AtomicInteger();
200+
final AtomicBoolean paused = new AtomicBoolean();
200201
willAnswer(i -> {
202+
if (paused.get()) {
203+
Thread.sleep(10);
204+
return ConsumerRecords.empty();
205+
}
201206
this.pollLatch.countDown();
202207
switch (which.getAndIncrement()) {
203208
case 0:
@@ -211,9 +216,20 @@ public Consumer consumer() {
211216
catch (@SuppressWarnings("unused") InterruptedException e) {
212217
Thread.currentThread().interrupt();
213218
}
214-
return new ConsumerRecords(Collections.emptyMap());
219+
return ConsumerRecords.empty();
215220
}
216221
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
222+
willAnswer(i -> {
223+
return Collections.emptySet();
224+
}).given(consumer).paused();
225+
willAnswer(i -> {
226+
paused.set(true);
227+
return null;
228+
}).given(consumer).pause(any());
229+
willAnswer(i -> {
230+
paused.set(false);
231+
return null;
232+
}).given(consumer).resume(any());
217233
willAnswer(i -> {
218234
this.commitLatch.countDown();
219235
return null;

spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTxTests.java renamed to spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackBatchTxTests.java

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@
3838
import java.util.Optional;
3939
import java.util.concurrent.CountDownLatch;
4040
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.atomic.AtomicBoolean;
4142
import java.util.concurrent.atomic.AtomicInteger;
4243

4344
import org.apache.kafka.clients.consumer.Consumer;
@@ -77,7 +78,7 @@
7778
@SpringJUnitConfig
7879
@DirtiesContext
7980
@SuppressWarnings("deprecation")
80-
public class ManualNackRecordTxTests {
81+
public class ManualNackBatchTxTests {
8182

8283
@SuppressWarnings("rawtypes")
8384
@Autowired
@@ -102,6 +103,7 @@ public class ManualNackRecordTxTests {
102103
@Test
103104
public void discardRemainingRecordsFromPollAndSeek() throws Exception {
104105
assertThat(this.config.deliveryLatch.await(10, TimeUnit.SECONDS)).isTrue();
106+
assertThat(this.config.replayTime).isBetween(50L, 30_000L);
105107
assertThat(this.config.commitLatch.await(10, TimeUnit.SECONDS)).isTrue();
106108
assertThat(this.config.pollLatch.await(10, TimeUnit.SECONDS)).isTrue();
107109
this.registry.stop();
@@ -128,24 +130,27 @@ public void discardRemainingRecordsFromPollAndSeek() throws Exception {
128130
@EnableKafka
129131
public static class Config {
130132

131-
private final List<List<String>> contents = new ArrayList<>();
133+
final List<List<String>> contents = new ArrayList<>();
132134

133-
private final CountDownLatch pollLatch = new CountDownLatch(3);
135+
final CountDownLatch pollLatch = new CountDownLatch(3);
134136

135-
private final CountDownLatch deliveryLatch = new CountDownLatch(2);
137+
final CountDownLatch deliveryLatch = new CountDownLatch(2);
136138

137-
private final CountDownLatch closeLatch = new CountDownLatch(1);
139+
final CountDownLatch closeLatch = new CountDownLatch(1);
138140

139-
private final CountDownLatch commitLatch = new CountDownLatch(2);
141+
final CountDownLatch commitLatch = new CountDownLatch(2);
140142

141-
private int count;
143+
volatile int count;
144+
145+
volatile long replayTime;
142146

143147
@KafkaListener(topics = "foo", groupId = "grp")
144148
public void foo(List<String> in, Acknowledgment ack) {
145149
this.contents.add(in);
150+
this.replayTime = System.currentTimeMillis() - this.replayTime;
146151
this.deliveryLatch.countDown();
147152
if (++this.count == 1) { // part 1, offset 1, first time
148-
ack.nack(3, 0);
153+
ack.nack(3, 50);
149154
}
150155
else {
151156
ack.acknowledge();
@@ -196,7 +201,12 @@ public Consumer consumer() {
196201
new ConsumerRecord("foo", 1, 1L, 0L, TimestampType.NO_TIMESTAMP_TYPE, 0, 0, null, "qux",
197202
new RecordHeaders(), Optional.empty())));
198203
final AtomicInteger which = new AtomicInteger();
204+
final AtomicBoolean paused = new AtomicBoolean();
199205
willAnswer(i -> {
206+
if (paused.get()) {
207+
Thread.sleep(10);
208+
return ConsumerRecords.empty();
209+
}
200210
this.pollLatch.countDown();
201211
switch (which.getAndIncrement()) {
202212
case 0:
@@ -210,9 +220,20 @@ public Consumer consumer() {
210220
catch (InterruptedException e) {
211221
Thread.currentThread().interrupt();
212222
}
213-
return new ConsumerRecords(Collections.emptyMap());
223+
return ConsumerRecords.empty();
214224
}
215225
}).given(consumer).poll(Duration.ofMillis(ContainerProperties.DEFAULT_POLL_TIMEOUT));
226+
willAnswer(i -> {
227+
return Collections.emptySet();
228+
}).given(consumer).paused();
229+
willAnswer(i -> {
230+
paused.set(true);
231+
return null;
232+
}).given(consumer).pause(any());
233+
willAnswer(i -> {
234+
paused.set(false);
235+
return null;
236+
}).given(consumer).resume(any());
216237
willAnswer(i -> {
217238
this.commitLatch.countDown();
218239
return null;

0 commit comments

Comments
 (0)