Skip to content

Commit ccb8b80

Browse files
authored
GH-1866: Fix Pause/Resume
Resolves #1866 The new retryable topic feature pauses/resumes individual partitions. This broke normal container pause/resume by incorrectly resuming partitions that were paused by the container pause operation. Similarly, if individual partitions were paused and then the container was paused and resumed, the container resumed all partitions. Decouple the functionality to prevent this cross-talk. Do not resume any individually paused partitions when the container is in a paused state. Do not resume any individually paused partitions when the container is resumed. Also Use a `ConcurrentHashMap.newKeySet()` instead of synchronization on partition pause requests. Use `getAssignedPartitions()` to allow the retry topic feature to work with manual assignments. Add tests to verify no cross-talk between pausing individual partitions and the container. * Fix race in test.
1 parent e29ec92 commit ccb8b80

File tree

3 files changed

+84
-27
lines changed

3 files changed

+84
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818

1919
import java.util.Arrays;
2020
import java.util.Collection;
21-
import java.util.HashSet;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Map.Entry;
2524
import java.util.Set;
25+
import java.util.concurrent.ConcurrentHashMap;
2626
import java.util.concurrent.CountDownLatch;
2727
import java.util.concurrent.TimeUnit;
2828
import java.util.regex.Pattern;
@@ -109,7 +109,7 @@ public abstract class AbstractMessageListenerContainer<K, V>
109109

110110
private ApplicationContext applicationContext;
111111

112-
private final Set<TopicPartition> pauseRequestedPartitions;
112+
private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();
113113

114114
/**
115115
* Construct an instance with the provided factory and properties.
@@ -159,8 +159,6 @@ protected AbstractMessageListenerContainer(ConsumerFactory<? super K, ? super V>
159159
if (this.containerProperties.getConsumerRebalanceListener() == null) {
160160
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
161161
}
162-
163-
this.pauseRequestedPartitions = new HashSet<>();
164162
}
165163

166164
@Override
@@ -263,23 +261,17 @@ protected boolean isPaused() {
263261

264262
@Override
265263
public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
266-
synchronized (this.pauseRequestedPartitions) {
267-
return this.pauseRequestedPartitions.contains(topicPartition);
268-
}
264+
return this.pauseRequestedPartitions.contains(topicPartition);
269265
}
270266

271267
@Override
272268
public void pausePartition(TopicPartition topicPartition) {
273-
synchronized (this.pauseRequestedPartitions) {
274-
this.pauseRequestedPartitions.add(topicPartition);
275-
}
269+
this.pauseRequestedPartitions.add(topicPartition);
276270
}
277271

278272
@Override
279273
public void resumePartition(TopicPartition topicPartition) {
280-
synchronized (this.pauseRequestedPartitions) {
281-
this.pauseRequestedPartitions.remove(topicPartition);
282-
}
274+
this.pauseRequestedPartitions.remove(topicPartition);
283275
}
284276

285277
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1270,7 +1270,9 @@ protected void pollAndInvoke() {
12701270
}
12711271
debugRecords(records);
12721272
resumeConsumerIfNeccessary();
1273-
resumePartitionsIfNecessary();
1273+
if (!this.consumerPaused) {
1274+
resumePartitionsIfNecessary();
1275+
}
12741276

12751277
invokeIfHaveRecords(records);
12761278
}
@@ -1522,7 +1524,8 @@ private void doResumeConsumerIfNeccessary() {
15221524
}
15231525
if (this.consumerPaused && !isPaused() && !this.pausedForAsyncAcks) {
15241526
this.logger.debug(() -> "Resuming consumption from: " + this.consumer.paused());
1525-
Set<TopicPartition> paused = this.consumer.paused();
1527+
Collection<TopicPartition> paused = new LinkedList<>(this.consumer.paused());
1528+
paused.removeAll(this.pausedPartitions);
15261529
this.consumer.resume(paused);
15271530
this.consumerPaused = false;
15281531
publishConsumerResumedEvent(paused);
@@ -1531,8 +1534,7 @@ private void doResumeConsumerIfNeccessary() {
15311534

15321535
private void pausePartitionsIfNecessary() {
15331536
Set<TopicPartition> pausedConsumerPartitions = this.consumer.paused();
1534-
List<TopicPartition> partitionsToPause = this
1535-
.assignedPartitions
1537+
List<TopicPartition> partitionsToPause = getAssignedPartitions()
15361538
.stream()
15371539
.filter(tp -> isPartitionPauseRequested(tp)
15381540
&& !pausedConsumerPartitions.contains(tp))

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 73 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import java.util.Map;
4848
import java.util.Map.Entry;
4949
import java.util.Properties;
50+
import java.util.Set;
5051
import java.util.concurrent.CountDownLatch;
5152
import java.util.concurrent.Executors;
5253
import java.util.concurrent.TimeUnit;
@@ -2553,14 +2554,6 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
25532554
AtomicBoolean first = new AtomicBoolean(true);
25542555
AtomicBoolean rebalance = new AtomicBoolean(true);
25552556
AtomicReference<ConsumerRebalanceListener> rebal = new AtomicReference<>();
2556-
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2557-
Thread.sleep(50);
2558-
if (rebalance.getAndSet(false)) {
2559-
rebal.get().onPartitionsRevoked(Collections.emptyList());
2560-
rebal.get().onPartitionsAssigned(records.keySet());
2561-
}
2562-
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2563-
});
25642557
final CountDownLatch seekLatch = new CountDownLatch(7);
25652558
willAnswer(i -> {
25662559
seekLatch.countDown();
@@ -2569,17 +2562,32 @@ public void testPauseResumeAndConsumerSeekAware() throws Exception {
25692562
given(consumer.assignment()).willReturn(records.keySet());
25702563
final CountDownLatch pauseLatch1 = new CountDownLatch(2); // consumer, event publisher
25712564
final CountDownLatch pauseLatch2 = new CountDownLatch(2); // consumer, consumer
2565+
Set<TopicPartition> pausedParts = new HashSet<>();
25722566
willAnswer(i -> {
25732567
pauseLatch1.countDown();
25742568
pauseLatch2.countDown();
2569+
pausedParts.addAll(i.getArgument(0));
25752570
return null;
25762571
}).given(consumer).pause(records.keySet());
2577-
given(consumer.paused()).willReturn(records.keySet());
2572+
given(consumer.paused()).willReturn(pausedParts);
2573+
CountDownLatch pollWhilePausedLatch = new CountDownLatch(2);
2574+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2575+
Thread.sleep(50);
2576+
if (pauseLatch1.getCount() == 0) {
2577+
pollWhilePausedLatch.countDown();
2578+
}
2579+
if (rebalance.getAndSet(false)) {
2580+
rebal.get().onPartitionsRevoked(Collections.emptyList());
2581+
rebal.get().onPartitionsAssigned(records.keySet());
2582+
}
2583+
return first.getAndSet(false) ? consumerRecords : emptyRecords;
2584+
});
25782585
final CountDownLatch resumeLatch = new CountDownLatch(2);
25792586
willAnswer(i -> {
25802587
resumeLatch.countDown();
2588+
pausedParts.removeAll(i.getArgument(0));
25812589
return null;
2582-
}).given(consumer).resume(records.keySet());
2590+
}).given(consumer).resume(any());
25832591
willAnswer(invoc -> {
25842592
rebal.set(invoc.getArgument(1));
25852593
return null;
@@ -2671,6 +2679,8 @@ else if (e instanceof ConsumerStoppedEvent) {
26712679
assertThat(container.isPaused()).isTrue();
26722680
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
26732681
assertThat(container.isContainerPaused()).isTrue();
2682+
assertThat(pollWhilePausedLatch.await(10, TimeUnit.SECONDS)).isTrue();
2683+
verify(consumer, never()).resume(any());
26742684
rebalance.set(true); // force a re-pause
26752685
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
26762686
container.resume();
@@ -2680,6 +2690,59 @@ else if (e instanceof ConsumerStoppedEvent) {
26802690
verify(consumer, times(6)).commitSync(anyMap(), eq(Duration.ofSeconds(41)));
26812691
}
26822692

2693+
@SuppressWarnings({ "unchecked" })
2694+
@Test
2695+
public void dontResumePausedPartition() throws Exception {
2696+
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
2697+
Consumer<Integer, String> consumer = mock(Consumer.class);
2698+
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
2699+
ConsumerRecords<Integer, String> emptyRecords = new ConsumerRecords<>(Collections.emptyMap());
2700+
AtomicBoolean first = new AtomicBoolean(true);
2701+
given(consumer.assignment()).willReturn(Set.of(new TopicPartition("foo", 0), new TopicPartition("foo", 1)));
2702+
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2703+
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2704+
Set<TopicPartition> pausedParts = new HashSet<>();
2705+
willAnswer(i -> {
2706+
pausedParts.addAll(i.getArgument(0));
2707+
pauseLatch1.countDown();
2708+
pauseLatch2.countDown();
2709+
return null;
2710+
}).given(consumer).pause(any());
2711+
given(consumer.paused()).willReturn(pausedParts);
2712+
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2713+
Thread.sleep(50);
2714+
return emptyRecords;
2715+
});
2716+
final CountDownLatch resumeLatch = new CountDownLatch(1);
2717+
willAnswer(i -> {
2718+
resumeLatch.countDown();
2719+
pausedParts.removeAll(i.getArgument(0));
2720+
return null;
2721+
}).given(consumer).resume(any());
2722+
ContainerProperties containerProps = new ContainerProperties(new TopicPartitionOffset("foo", 0),
2723+
new TopicPartitionOffset("foo", 1));
2724+
containerProps.setGroupId("grp");
2725+
containerProps.setAckMode(AckMode.RECORD);
2726+
containerProps.setClientId("clientId");
2727+
containerProps.setIdleEventInterval(100L);
2728+
containerProps.setMessageListener((MessageListener) rec -> { });
2729+
containerProps.setMissingTopicsFatal(false);
2730+
KafkaMessageListenerContainer<Integer, String> container =
2731+
new KafkaMessageListenerContainer<>(cf, containerProps);
2732+
container.start();
2733+
InOrder inOrder = inOrder(consumer);
2734+
container.pausePartition(new TopicPartition("foo", 1));
2735+
assertThat(pauseLatch1.await(10, TimeUnit.SECONDS)).isTrue();
2736+
assertThat(pausedParts).hasSize(1);
2737+
container.pause();
2738+
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
2739+
assertThat(pausedParts).hasSize(2);
2740+
container.resume();
2741+
assertThat(resumeLatch.await(10, TimeUnit.SECONDS)).isTrue();
2742+
assertThat(pausedParts).hasSize(1);
2743+
container.stop();
2744+
}
2745+
26832746
@SuppressWarnings({ "unchecked", "rawtypes" })
26842747
@Test
26852748
public void testInitialSeek() throws Exception {

0 commit comments

Comments
 (0)