Skip to content

Commit 7103483

Browse files
committed
GH-2222: Re-Pause Paused Partitions After Rebal.
Resolves #2222 Re-pause any paused partitions that are re-assigned after a rebalance, Remove any partitions that are revoked and not re-assigned from the paused partitions collections (pause requests and actually paused).
1 parent 4154271 commit 7103483

File tree

2 files changed

+94
-10
lines changed

2 files changed

+94
-10
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
695695

696696
private final boolean stopImmediate = this.containerProperties.isStopImmediate();
697697

698-
private final Set<TopicPartition> pausedPartitions;
698+
private final Set<TopicPartition> pausedPartitions = new HashSet<>();
699699

700700
private final Map<TopicPartition, List<Long>> offsetsInThisBatch =
701701
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
@@ -844,7 +844,6 @@ else if (listener instanceof MessageListener) {
844844
this.lastReceivePartition = new HashMap<>();
845845
this.lastAlertPartition = new HashMap<>();
846846
this.wasIdlePartition = new HashMap<>();
847-
this.pausedPartitions = new HashSet<>();
848847
}
849848

850849
@Nullable
@@ -3291,12 +3290,15 @@ private class ListenerConsumerRebalanceListener implements ConsumerRebalanceList
32913290
this.userListener instanceof ConsumerAwareRebalanceListener
32923291
? (ConsumerAwareRebalanceListener) this.userListener : null;
32933292

3293+
private final Collection<TopicPartition> revoked = new LinkedList<>();
3294+
32943295
ListenerConsumerRebalanceListener() {
32953296
}
32963297

32973298
@Override
32983299
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
32993300
try {
3301+
this.revoked.addAll(partitions);
33003302
if (this.consumerAwareListener != null) {
33013303
this.consumerAwareListener.onPartitionsRevokedBeforeCommit(ListenerConsumer.this.consumer,
33023304
partitions);
@@ -3343,14 +3345,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
33433345

33443346
@Override
33453347
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
3346-
if (ListenerConsumer.this.consumerPaused) {
3347-
ListenerConsumer.this.consumer.pause(partitions);
3348-
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
3349-
+ "consumer paused again, so the initial poll() will never return any records");
3350-
}
3351-
if (ListenerConsumer.this.pausedForNack.size() > 0) {
3352-
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
3353-
}
3348+
repauseIfNeeded(partitions);
33543349
ListenerConsumer.this.assignedPartitions.addAll(partitions);
33553350
if (ListenerConsumer.this.commitCurrentOnAssignment
33563351
&& !collectAndCommitIfNecessary(partitions)) {
@@ -3367,6 +3362,30 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
33673362
}
33683363
}
33693364

3365+
private void repauseIfNeeded(Collection<TopicPartition> partitions) {
3366+
if (ListenerConsumer.this.consumerPaused) {
3367+
ListenerConsumer.this.consumer.pause(partitions);
3368+
ListenerConsumer.this.logger.warn("Paused consumer resumed by Kafka due to rebalance; "
3369+
+ "consumer paused again, so the initial poll() will never return any records");
3370+
}
3371+
Collection<TopicPartition> toRepause = new LinkedList<>();
3372+
partitions.forEach(tp -> {
3373+
if (isPartitionPauseRequested(tp)) {
3374+
toRepause.add(tp);
3375+
}
3376+
});
3377+
if (!ListenerConsumer.this.consumerPaused && toRepause.size() > 0) {
3378+
ListenerConsumer.this.consumer.pause(toRepause);
3379+
}
3380+
this.revoked.removeAll(toRepause);
3381+
this.revoked.forEach(tp -> resumePartition(tp));
3382+
ListenerConsumer.this.pausedPartitions.removeAll(this.revoked);
3383+
this.revoked.clear();
3384+
if (ListenerConsumer.this.pausedForNack.size() > 0) {
3385+
ListenerConsumer.this.consumer.pause(ListenerConsumer.this.pausedForNack);
3386+
}
3387+
}
3388+
33703389
private boolean collectAndCommitIfNecessary(Collection<TopicPartition> partitions) {
33713390
// Commit initial positions - this is generally redundant but
33723391
// it protects us from the case when another consumer starts

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.apache.kafka.common.errors.TopicAuthorizationException;
8181
import org.apache.kafka.common.errors.WakeupException;
8282
import org.apache.kafka.common.serialization.IntegerDeserializer;
83+
import org.assertj.core.api.InstanceOfAssertFactories;
8384
import org.junit.jupiter.api.BeforeAll;
8485
import org.junit.jupiter.api.Test;
8586
import org.mockito.ArgumentCaptor;
@@ -2755,6 +2756,70 @@ public void dontResumePausedPartition() throws Exception {
27552756
container.stop();
27562757
}
27572758

2759+
@SuppressWarnings({ "unchecked" })
2760+
@Test
2761+
public void rePausePartitionAfterRebalance() throws Exception {
2762+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2763+
Consumer<Integer, String> consumer = mock(Consumer.class);
2764+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2765+
AtomicBoolean first = new AtomicBoolean(true);
2766+
TopicPartition tp0 = new TopicPartition("foo", 0);
2767+
TopicPartition tp1 = new TopicPartition("foo", 1);
2768+
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
2769+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2770+
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2771+
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2772+
willAnswer(i -> {
2773+
pausedParts.clear();
2774+
pausedParts.addAll(i.getArgument(0));
2775+
pauseLatch1.countDown();
2776+
pauseLatch2.countDown();
2777+
return null;
2778+
}).given(consumer).pause(any());
2779+
given(consumer.paused()).willReturn(pausedParts);
2780+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2781+
Thread.sleep(50);
2782+
return ConsumerRecords.empty();
2783+
});
2784+
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2785+
Collection<String> foos = new ArrayList<>();
2786+
foos.add("foo");
2787+
willAnswer(inv -> {
2788+
rebal.set(inv.getArgument(1));
2789+
rebal.get().onPartitionsAssigned(Set.of(tp0, tp1));
2790+
return null;
2791+
}).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class));
2792+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2793+
ContainerProperties containerProps = new ContainerProperties("foo");
2794+
containerProps.setGroupId("grp");
2795+
containerProps.setAckMode(AckMode.RECORD);
2796+
containerProps.setClientId("clientId");
2797+
containerProps.setIdleEventInterval(100L);
2798+
containerProps.setMessageListener((MessageListener) rec -> { });
2799+
containerProps.setMissingTopicsFatal(false);
2800+
KafkaMessageListenerContainer<Integer, String> container =
2801+
new KafkaMessageListenerContainer<>(cf, containerProps);
2802+
container.start();
2803+
InOrder inOrder = inOrder(consumer);
2804+
container.pausePartition(tp0);
2805+
container.pausePartition(tp1);
2806+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2807+
assertThat(pausedParts).hasSize(2)
2808+
.contains(tp0, tp1);
2809+
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
2810+
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2811+
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
2812+
assertThat(pausedParts).hasSize(1)
2813+
.contains(tp0);
2814+
assertThat(container).extracting("listenerConsumer")
2815+
.extracting("pausedPartitions")
2816+
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
2817+
.hasSize(1)
2818+
.containsExactlyInAnyOrder(tp0);
2819+
2820+
container.stop();
2821+
}
2822+
27582823
@SuppressWarnings({ "unchecked", "rawtypes" })
27592824
@Test
27602825
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)