Skip to content

Commit feb3563

Browse files
garyrussellartembilan
authored andcommitted
GH-2222: Re-Pause Paused Partitions After Rebal.
Resolves #2222 Re-pause any paused partitions that are re-assigned after a rebalance, Remove any partitions that are revoked and not re-assigned from the paused partitions collections (pause requests and actually paused).
1 parent e1e46c6 commit feb3563

File tree

2 files changed

+94
-10
lines changed

2 files changed

+94
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -691,7 +691,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
691691

692692
private final boolean stopImmediate = this.containerProperties.isStopImmediate();
693693

694-
private final Set<TopicPartition> pausedPartitions;
694+
private final Set<TopicPartition> pausedPartitions = new HashSet<>();
695695

696696
private final Map<TopicPartition, List<Long>> offsetsInThisBatch =
697697
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
@@ -838,7 +838,6 @@ else if (listener instanceof MessageListener) {
838838
this.lastReceivePartition = new HashMap<>();
839839
this.lastAlertPartition = new HashMap<>();
840840
this.wasIdlePartition = new HashMap<>();
841-
this.pausedPartitions = new HashSet<>();
842841
}
843842

844843
@Nullable
@@ -3224,11 +3223,14 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
32243223
this.userListener instanceof ConsumerAwareRebalanceListener
32253224
? (ConsumerAwareRebalanceListener) this.userListener : null;
32263225

3226+
private final Collection<TopicPartition> revoked = new LinkedList<>();
3227+
32273228
ListenerConsumerRebalanceListener() {
32283229
}
32293230

32303231
@Override
32313232
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
3233+
this.revoked.addAll(partitions);
32323234
if (this.consumerAwareListener != null) {
32333235
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
32343236
partitions);
@@ -3269,14 +3271,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
32693271

32703272
@Override
32713273
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
3272-
if (ListenerConsumer.this.consumerPaused) {
3273-
ListenerConsumer.this.consumer.pause(partitions);
3274-
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
3275-
+ "consumer paused again, so the initial poll() will never return any records");
3276-
}
3277-
if (ListenerConsumer.this.pausedForNack.size() > 0) {
3278-
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
3279-
}
3274+
repauseIfNeeded(partitions);
32803275
ListenerConsumer.this.assignedPartitions.addAll(partitions);
32813276
if (ListenerConsumer.this.commitCurrentOnAssignment
32823277
&& !collectAndCommitIfNecessary(partitions)) {
@@ -3293,6 +3288,30 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
32933288
}
32943289
}
32953290

3291+
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3292+
if (ListenerConsumer.this.consumerPaused) {
3293+
ListenerConsumer.this.consumer.pause(partitions);
3294+
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
3295+
+ "consumer paused again, so the initial poll() will never return any records");
3296+
}
3297+
Collection<TopicPartition> toRepause = new LinkedList<>();
3298+
partitions.forEach(tp -> {
3299+
if (isPartitionPauseRequested(tp)) {
3300+
toRepause.add(tp);
3301+
}
3302+
});
3303+
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
3304+
ListenerConsumer.this.consumer.pause(toRepause);
3305+
}
3306+
this.revoked.removeAll(toRepause);
3307+
this.revoked.forEach(tp -> resumePartition(tp));
3308+
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
3309+
this.revoked.clear();
3310+
if (ListenerConsumer.this.pausedForNack.size() > 0) {
3311+
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
3312+
}
3313+
}
3314+
32963315
private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
32973316
// Commit initial positions - this is generally redundant but
32983317
// it protects us from the case when another consumer starts

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.kafka.common.errors.TopicAuthorizationException;
8181
import org.apache.kafka.common.errors.WakeupException;
8282
import org.apache.kafka.common.serialization.IntegerDeserializer;
83+
import org.assertj.core.api.InstanceOfAssertFactories;
8384
import org.junit.jupiter.api.BeforeAll;
8485
import org.junit.jupiter.api.Test;
8586
import org.mockito.ArgumentCaptor;
@@ -2761,6 +2762,70 @@ public void dontResumePausedPartition() throws Exception {
27612762
container.stop();
27622763
}
27632764

2765+
@SuppressWarnings({ "unchecked" })
2766+
@Test
2767+
public void rePausePartitionAfterRebalance() throws Exception {
2768+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2769+
Consumer<Integer, String> consumer = mock(Consumer.class);
2770+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2771+
AtomicBoolean first = new AtomicBoolean(true);
2772+
TopicPartition tp0 = new TopicPartition("foo", 0);
2773+
TopicPartition tp1 = new TopicPartition("foo", 1);
2774+
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
2775+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2776+
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2777+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2778+
willAnswer(i -> {
2779+
pausedParts.clear();
2780+
pausedParts.addAll(i.getArgument(0));
2781+
pauseLatch1.countDown();
2782+
pauseLatch2.countDown();
2783+
return null;
2784+
}).given(consumer).pause(any());
2785+
given(consumer.paused()).willReturn(pausedParts);
2786+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2787+
Thread.sleep(50);
2788+
return ConsumerRecords.empty();
2789+
});
2790+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2791+
Collection<String> foos = new ArrayList<>();
2792+
foos.add("foo");
2793+
willAnswer(inv -> {
2794+
rebal.set(inv.getArgument(1));
2795+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2796+
return null;
2797+
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
2798+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2799+
ContainerProperties containerProps = new ContainerProperties("foo");
2800+
containerProps.setGroupId("grp");
2801+
containerProps.setAckMode(AckMode.RECORD);
2802+
containerProps.setClientId("clientId");
2803+
containerProps.setIdleEventInterval(100L);
2804+
containerProps.setMessageListener((MessageListener) rec -> { });
2805+
containerProps.setMissingTopicsFatal(false);
2806+
KafkaMessageListenerContainer<Integer, String> container =
2807+
new KafkaMessageListenerContainer<>(cf, containerProps);
2808+
container.start();
2809+
InOrder inOrder = inOrder(consumer);
2810+
container.pausePartition(tp0);
2811+
container.pausePartition(tp1);
2812+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2813+
assertThat(pausedParts).hasSize(2)
2814+
.contains(tp0, tp1);
2815+
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
2816+
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2817+
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
2818+
assertThat(pausedParts).hasSize(1)
2819+
.contains(tp0);
2820+
assertThat(container).extracting("listenerConsumer")
2821+
.extracting("pausedPartitions")
2822+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2823+
.hasSize(1)
2824+
.containsExactlyInAnyOrder(tp0);
2825+
2826+
container.stop();
2827+
}
2828+
27642829
@SuppressWarnings({ "unchecked", "rawtypes" })
27652830
@Test
27662831
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)