diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index 3c8207a69f..1f243bc76e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -2769,7 +2769,6 @@ public void rePausePartitionAfterRebalance() throws Exception { rebal.get().onPartitionsAssigned(Set.of(tp0, tp1)); return null; }).given(consumer).subscribe(eq(foos), any(ConsumerRebalanceListener.class)); - final CountDownLatch resumeLatch = new CountDownLatch(1); ContainerProperties containerProps = new ContainerProperties("foo"); containerProps.setGroupId("grp"); containerProps.setAckMode(AckMode.RECORD); @@ -2780,7 +2779,6 @@ public void rePausePartitionAfterRebalance() throws Exception { KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.start(); - InOrder inOrder = inOrder(consumer); assertThat(firstPoll.await(10, TimeUnit.SECONDS)).isNotNull(); container.pausePartition(tp0); container.pausePartition(tp1); @@ -2811,7 +2809,6 @@ public void resumePartitionAfterRevokeAndReAssign() throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class); given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer); - AtomicBoolean first = new AtomicBoolean(true); TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); given(consumer.assignment()).willReturn(Set.of(tp0, tp1)); @@ -3462,7 +3459,6 @@ public void testCooperativeRebalance() throws Exception { containerProps.setGroupId("grp"); containerProps.setClientId("clientId"); containerProps.setMessageListener((MessageListener) msg -> { }); - Properties consumerProps = new Properties(); KafkaMessageListenerContainer container = new KafkaMessageListenerContainer<>(cf, containerProps); container.start(); @@ -3606,7 +3602,6 @@ else if (call == 1) { }).given(consumer).subscribe(any(Collection.class), any(ConsumerRebalanceListener.class)); List> commits = new ArrayList<>(); AtomicBoolean firstCommit = new AtomicBoolean(true); - AtomicInteger commitCount = new AtomicInteger(); willAnswer(invoc -> { commits.add(invoc.getArgument(0, Map.class)); if (!firstCommit.getAndSet(false)) { @@ -3888,6 +3883,11 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early latch.countDown(); return null; }).given(consumer).commitSync(any(), any()); + CountDownLatch closeLatch = new CountDownLatch(1); + willAnswer(inv -> { + closeLatch.countDown(); + return null; + }).given(consumer).close(); TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] { new TopicPartitionOffset("foo", 0) }; @@ -3902,6 +3902,7 @@ public void testInvokeRecordInterceptorAllSkipped(AckMode ackMode, boolean early containerProps.setKafkaAwareTransactionManager(mock(KafkaAwareTransactionManager.class)); } + CountDownLatch afterRecordLatch = new CountDownLatch(2); RecordInterceptor recordInterceptor = spy(new RecordInterceptor() { @Override @@ -3912,6 +3913,10 @@ public ConsumerRecord intercept(ConsumerRecord return null; } + public void afterRecord(ConsumerRecord record, Consumer consumer) { + afterRecordLatch.countDown(); + } + }); KafkaMessageListenerContainer container = @@ -3920,6 +3925,9 @@ public ConsumerRecord intercept(ConsumerRecord container.setInterceptBeforeTx(early); container.start(); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(afterRecordLatch.await(10, TimeUnit.SECONDS)).isTrue(); + container.stop(); + assertThat(closeLatch.await(10, TimeUnit.SECONDS)).isTrue(); InOrder inOrder = inOrder(recordInterceptor, consumer); inOrder.verify(recordInterceptor).setupThreadState(eq(consumer)); @@ -3946,12 +3954,12 @@ public ConsumerRecord intercept(ConsumerRecord inOrder.verify(consumer).commitSync(eq(Map.of(new TopicPartition("foo", 0), new OffsetAndMetadata(2L))), any(Duration.class)); } - container.stop(); + inOrder.verify(consumer).close(); } @ParameterizedTest(name = "{index} testInvokeBatchInterceptorAllSkipped early intercept {0}") @ValueSource(booleans = { true, false }) - @SuppressWarnings({ "unchecked", "deprecation" }) + @SuppressWarnings("unchecked") public void testInvokeBatchInterceptorAllSkipped(boolean early) throws Exception { ConsumerFactory cf = mock(ConsumerFactory.class); Consumer consumer = mock(Consumer.class);