Skip to content

Commit c863597

Browse files
committed
GH-2222: Fix Race in Test
1 parent 3903d1a commit c863597

File tree

1 file changed

+13
-6
lines changed

1 file changed

+13
-6
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2767,17 +2767,25 @@ public void rePausePartitionAfterRebalance() throws Exception {
27672767
TopicPartition tp1 = new TopicPartition("foo", 1);
27682768
given(consumer.assignment()).willReturn(Set.of(tp0, tp1));
27692769
final CountDownLatch pauseLatch1 = new CountDownLatch(1);
2770-
final CountDownLatch pauseLatch2 = new CountDownLatch(2);
2770+
final CountDownLatch suspendConsumerThread = new CountDownLatch(1);
27712771
Set<TopicPartition> pausedParts = ConcurrentHashMap.newKeySet();
2772+
Thread testThread = Thread.currentThread();
2773+
AtomicBoolean paused = new AtomicBoolean();
27722774
willAnswer(i -> {
27732775
pausedParts.clear();
27742776
pausedParts.addAll(i.getArgument(0));
2775-
pauseLatch1.countDown();
2776-
pauseLatch2.countDown();
2777+
if (!Thread.currentThread().equals(testThread)) {
2778+
paused.set(true);
2779+
}
27772780
return null;
27782781
}).given(consumer).pause(any());
27792782
given(consumer.paused()).willReturn(pausedParts);
27802783
given(consumer.poll(any(Duration.class))).willAnswer(i -> {
2784+
if (paused.get()) {
2785+
pauseLatch1.countDown();
2786+
// hold up the consumer thread while we revoke/assign partitions on the test thread
2787+
suspendConsumerThread.await(10, TimeUnit.SECONDS);
2788+
}
27812789
Thread.sleep(50);
27822790
return ConsumerRecords.empty();
27832791
});
@@ -2808,15 +2816,14 @@ public void rePausePartitionAfterRebalance() throws Exception {
28082816
.contains(tp0, tp1);
28092817
rebal.get().onPartitionsRevoked(Set.of(tp0, tp1));
28102818
rebal.get().onPartitionsAssigned(Collections.singleton(tp0));
2811-
assertThat(pauseLatch2.await(10, TimeUnit.SECONDS)).isTrue();
28122819
assertThat(pausedParts).hasSize(1)
28132820
.contains(tp0);
28142821
assertThat(container).extracting("listenerConsumer")
28152822
.extracting("pausedPartitions")
28162823
.asInstanceOf(InstanceOfAssertFactories.collection(TopicPartition.class))
28172824
.hasSize(1)
2818-
.containsExactlyInAnyOrder(tp0);
2819-
2825+
.contains(tp0);
2826+
suspendConsumerThread.countDown();
28202827
container.stop();
28212828
}
28222829

0 commit comments

Comments
 (0)