Skip to content

Commit cd43976

Browse files
committed
GH-2222: Re-Pause Paused Partitions After Rebal.
Resolves #2222 Re-pause any paused partitions that are re-assigned after a rebalance, Remove any partitions that are revoked and not re-assigned from the paused partitions collections (pause requests and actually paused).
1 parent c3d9779 commit cd43976

File tree

3 files changed

+99
-15
lines changed

3 files changed

+99
-15
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ ext {
5050
files(grgit.status().unstaged.modified).filter{ f -> f.name.endsWith('.java') || f.name.endsWith('.kt') }
5151
}
5252

53-
assertjVersion = '3.19.0'
53+
assertjVersion = '3.21.0'
5454
awaitilityVersion = '4.0.3'
5555
googleJsr305Version = '3.0.2'
5656
hamcrestVersion = '2.2'

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,14 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
662662

663663
private final boolean stopImmediate = this.containerProperties.isStopImmediate();
664664

665+
private final Set<TopicPartition> pausedPartitions = new HashSet<>();
666+
667+
private final Map<TopicPartition, Long> lastReceivePartition;
668+
669+
private final Map<TopicPartition, Long> lastAlertPartition;
670+
671+
private final Map<TopicPartition, Boolean> wasIdlePartition;
672+
665673
private Map<TopicPartition, OffsetMetadata> definedPartitions;
666674

667675
private int count;
@@ -676,10 +684,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
676684

677685
private long lastAlertAt = this.lastReceive;
678686

679-
private final Map<TopicPartition, Long> lastReceivePartition;
680-
681-
private final Map<TopicPartition, Long> lastAlertPartition;
682-
683687
private long nackSleep = -1;
684688

685689
private int nackIndex;
@@ -696,8 +700,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
696700

697701
private boolean wasIdle;
698702

699-
private final Map<TopicPartition, Boolean> wasIdlePartition;
700-
701703
private boolean batchFailed;
702704

703705
private volatile boolean consumerPaused;
@@ -706,8 +708,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
706708

707709
private volatile long lastPoll = System.currentTimeMillis();
708710

709-
private final Set<TopicPartition> pausedPartitions;
710-
711711
@SuppressWarnings(UNCHECKED)
712712
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
713713
Properties consumerProperties = propertiesFromProperties();
@@ -795,7 +795,6 @@ else if (listener instanceof MessageListener) {
795795
this.lastReceivePartition = new HashMap<>();
796796
this.lastAlertPartition = new HashMap<>();
797797
this.wasIdlePartition = new HashMap<>();
798-
this.pausedPartitions = new HashSet<>();
799798
}
800799

801800
private Properties propertiesFromProperties() {
@@ -2918,12 +2917,15 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
29182917
this.userListener instanceof ConsumerAwareRebalanceListener
29192918
? (ConsumerAwareRebalanceListener) this.userListener : null;
29202919

2920+
private final Collection<TopicPartition> revoked = new LinkedList<>();
2921+
29212922
ListenerConsumerRebalanceListener() {
29222923
}
29232924

29242925
@Override
29252926
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
29262927
try {
2928+
this.revoked.addAll(partitions);
29272929
if (this.consumerAwareListener != null) {
29282930
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
29292931
partitions);
@@ -2961,11 +2963,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
29612963

29622964
@Override
29632965
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
2964-
if (ListenerConsumer.this.consumerPaused) {
2965-
ListenerConsumer.this.consumer.pause(partitions);
2966-
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
2967-
+ "consumer paused again, so the initial poll() will never return any records");
2968-
}
2966+
repauseIfNeeded(partitions);
29692967
ListenerConsumer.this.assignedPartitions.addAll(partitions);
29702968
if (ListenerConsumer.this.commitCurrentOnAssignment
29712969
&& !collectAndCommitIfNecessary(partitions)) {
@@ -2982,6 +2980,27 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
29822980
}
29832981
}
29842982

2983+
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
2984+
if (ListenerConsumer.this.consumerPaused) {
2985+
ListenerConsumer.this.consumer.pause(partitions);
2986+
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
2987+
+ "consumer paused again, so the initial poll() will never return any records");
2988+
}
2989+
Collection<TopicPartition> toRepause = new LinkedList<>();
2990+
partitions.forEach(tp -> {
2991+
if (isPartitionPauseRequested(tp)) {
2992+
toRepause.add(tp);
2993+
}
2994+
});
2995+
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
2996+
ListenerConsumer.this.consumer.pause(toRepause);
2997+
}
2998+
this.revoked.removeAll(toRepause);
2999+
this.revoked.forEach(tp -> resumePartition(tp));
3000+
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
3001+
this.revoked.clear();
3002+
}
3003+
29853004
private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
29863005
// Commit initial positions - this is generally redundant but
29873006
// it protects us from the case when another consumer starts

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import org.apache.kafka.common.errors.TopicAuthorizationException;
8080
import org.apache.kafka.common.errors.WakeupException;
8181
import org.apache.kafka.common.serialization.IntegerDeserializer;
82+
import org.assertj.core.api.InstanceOfAssertFactories;
8283
import org.junit.jupiter.api.BeforeAll;
8384
import org.junit.jupiter.api.Test;
8485
import org.mockito.ArgumentCaptor;
@@ -2532,6 +2533,70 @@ public void dontResumePausedPartition() throws Exception {
25322533
container.stop();
25332534
}
25342535

2536+
@SuppressWarnings({ "unchecked" })
2537+
@Test
2538+
public void rePausePartitionAfterRebalance() throws Exception {
2539+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2540+
Consumer<Integer, String> consumer = mock(Consumer.class);
2541+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2542+
AtomicBoolean first = new AtomicBoolean(true);
2543+
TopicPartition tp0 = new TopicPartition("foo", 0);
2544+
TopicPartition tp1 = new TopicPartition("foo", 1);
2545+
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
2546+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2547+
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2548+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2549+
willAnswer(i -> {
2550+
pausedParts.clear();
2551+
pausedParts.addAll(i.getArgument(0));
2552+
pauseLatch1.countDown();
2553+
pauseLatch2.countDown();
2554+
return null;
2555+
}).given(consumer).pause(any());
2556+
given(consumer.paused()).willReturn(pausedParts);
2557+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2558+
Thread.sleep(50);
2559+
return ConsumerRecords.empty();
2560+
});
2561+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2562+
Collection<String> foos = new ArrayList<>();
2563+
foos.add("foo");
2564+
willAnswer(inv -> {
2565+
rebal.set(inv.getArgument(1));
2566+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2567+
return null;
2568+
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
2569+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2570+
ContainerProperties containerProps = new ContainerProperties("foo");
2571+
containerProps.setGroupId("grp");
2572+
containerProps.setAckMode(AckMode.RECORD);
2573+
containerProps.setClientId("clientId");
2574+
containerProps.setIdleEventInterval(100L);
2575+
containerProps.setMessageListener((MessageListener) rec -> { });
2576+
containerProps.setMissingTopicsFatal(false);
2577+
KafkaMessageListenerContainer<Integer, String> container =
2578+
new KafkaMessageListenerContainer<>(cf, containerProps);
2579+
container.start();
2580+
InOrder inOrder = inOrder(consumer);
2581+
container.pausePartition(tp0);
2582+
container.pausePartition(tp1);
2583+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2584+
assertThat(pausedParts).hasSize(2)
2585+
.contains(tp0, tp1);
2586+
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
2587+
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2588+
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
2589+
assertThat(pausedParts).hasSize(1)
2590+
.contains(tp0);
2591+
assertThat(container).extracting("listenerConsumer")
2592+
.extracting("pausedPartitions")
2593+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2594+
.hasSize(1)
2595+
.containsExactlyInAnyOrder(tp0);
2596+
2597+
container.stop();
2598+
}
2599+
25352600
@SuppressWarnings({ "unchecked", "rawtypes" })
25362601
@Test
25372602
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)