diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java index 2b82126b81..b8f1c81dff 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,7 +30,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -130,7 +129,7 @@ else if (event instanceof ConsumerFailedToStartEvent) { exec.destroy(); } - @SuppressWarnings({ "rawtypes", "unchecked", "deprecation" }) + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test void testCorrectContainerForConsumerError() throws InterruptedException { ConsumerFactory consumerFactory = mock(ConsumerFactory.class); @@ -200,10 +199,8 @@ void delayedIdleEvent() throws InterruptedException { containerProperties); CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(2); - AtomicReference eventTime = new AtomicReference<>(); container.setApplicationEventPublisher(event -> { if (event instanceof ListenerContainerIdleEvent) { - eventTime.set(System.currentTimeMillis()); latch1.countDown(); latch2.countDown(); } @@ -263,7 +260,7 @@ void testSyncRelativeSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -304,7 +301,7 @@ void testAsyncRelativeSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); Map>> recordMap = new HashMap<>(); recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); @@ -363,7 +360,7 @@ void testSyncTimestampSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -410,7 +407,7 @@ void testAsyncTimestampSeeks() throws InterruptedException { TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List assignments = Arrays.asList(tp0, tp1, tp2, tp3); + List assignments = List.of(tp0, tp1, tp2, tp3); Map>> recordMap = new HashMap<>(); recordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); recordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); @@ -508,7 +505,9 @@ void testBatchInterceptBeforeTx1() throws InterruptedException { } @SuppressWarnings({ "rawtypes", "unchecked" }) - void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batch) throws InterruptedException { + void testIntercept(boolean beforeTx, @Nullable AssignmentCommitOption option, boolean batch) + throws InterruptedException { + ConsumerFactory consumerFactory = mock(ConsumerFactory.class); final Consumer consumer = mock(Consumer.class); TopicPartition tp0 = new TopicPartition("foo", 0); @@ -523,7 +522,7 @@ void testIntercept(boolean beforeTx, AssignmentCommitOption option, boolean batc Thread.sleep(10); return firstOrSecondPoll.incrementAndGet() < 3 ? records : empty; }).given(consumer).poll(any()); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -676,7 +675,7 @@ void testInterceptInTxNonKafkaTM() throws InterruptedException { Thread.sleep(10); return firstOrSecondPoll.incrementAndGet() < 2 ? records : empty; }).given(consumer).poll(any()); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -771,7 +770,7 @@ void testNoCommitOnAssignmentWithEarliest() throws InterruptedException { return records; }).given(consumer).poll(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -814,7 +813,7 @@ private void testInitialCommitIBasedOnCommitted(boolean committed) throws Interr return records; }).given(consumer).poll(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); willAnswer(invocation -> { ((ConsumerRebalanceListener) invocation.getArgument(1)) .onPartitionsAssigned(assignments); @@ -865,7 +864,7 @@ void removeFromPartitionPauseRequestedWhenNotAssigned() throws InterruptedExcept return null; }).given(consumer).pause(any()); TopicPartition tp0 = new TopicPartition("foo", 0); - List assignments = Arrays.asList(tp0); + List assignments = List.of(tp0); AtomicReference rebal = new AtomicReference<>(); willAnswer(invocation -> { rebal.set(invocation.getArgument(1)); @@ -911,14 +910,14 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseLegacyAssi TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List allAssignments = Arrays.asList(tp0, tp1, tp2, tp3); + List allAssignments = List.of(tp0, tp1, tp2, tp3); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, Collections.singletonList(new ConsumerRecord("foo", 0, 0, null, "bar"))); allRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar"))); allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1, tp3); + List afterRevokeAssignments = List.of(tp1, tp3); Map>> afterRevokeRecordMap = new HashMap<>(); afterRevokeRecordMap.put(tp1, Collections.singletonList(new ConsumerRecord("foo", 1, 0, null, "bar"))); afterRevokeRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); @@ -979,10 +978,11 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(allAssignments); rebal.get().onPartitionsAssigned(afterRevokeAssignments); rebalLatch.countDown(); @@ -991,11 +991,13 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, return ConsumerRecords.empty(); } return afterRevokeRecords; - default: + } + default -> { if (paused.get()) { return ConsumerRecords.empty(); } return afterRevokeRecords; + } } }).given(consumer).poll(any()); container.start(); @@ -1023,7 +1025,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign TopicPartition tp1 = new TopicPartition("foo", 1); TopicPartition tp2 = new TopicPartition("foo", 2); TopicPartition tp3 = new TopicPartition("foo", 3); - List allAssignments = Arrays.asList(tp0, tp1, tp2, tp3); + List allAssignments = List.of(tp0, tp1, tp2, tp3); Map>> allRecordMap = new LinkedHashMap<>(); ConsumerRecord record0 = new ConsumerRecord("foo", 0, 0, null, "bar"); ConsumerRecord record1 = new ConsumerRecord("foo", 1, 0, null, "bar"); @@ -1032,7 +1034,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign allRecordMap.put(tp2, Collections.singletonList(new ConsumerRecord("foo", 2, 0, null, "bar"))); allRecordMap.put(tp3, Collections.singletonList(new ConsumerRecord("foo", 3, 0, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List revokedAssignments = Arrays.asList(tp0, tp2); + List revokedAssignments = List.of(tp0, tp2); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1044,9 +1046,7 @@ void pruneRevokedPartitionsFromRemainingRecordsWhenSeekAfterErrorFalseCoopAssign return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1087,17 +1087,20 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(revokedAssignments); rebal.get().onPartitionsAssigned(Collections.emptyList()); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1128,14 +1131,14 @@ public boolean handleOne(Exception thrownException, ConsumerRecord record, void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException { TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); - List allAssignments = Arrays.asList(tp0, tp1); + List allAssignments = List.of(tp0, tp1); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar"))); allRecordMap.put(tp1, List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1); + List afterRevokeAssignments = List.of(tp1); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1147,9 +1150,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1171,17 +1172,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(allAssignments); rebal.get().onPartitionsAssigned(afterRevokeAssignments); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1206,14 +1210,13 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws I void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws InterruptedException { TopicPartition tp0 = new TopicPartition("foo", 0); TopicPartition tp1 = new TopicPartition("foo", 1); - List allAssignments = Arrays.asList(tp0, tp1); + List allAssignments = List.of(tp0, tp1); Map>> allRecordMap = new HashMap<>(); allRecordMap.put(tp0, List.of(new ConsumerRecord("foo", 0, 0, null, "bar"), new ConsumerRecord("foo", 0, 1, null, "bar"))); allRecordMap.put(tp1, List.of(new ConsumerRecord("foo", 1, 0, null, "bar"), new ConsumerRecord("foo", 1, 1, null, "bar"))); ConsumerRecords allRecords = new ConsumerRecords<>(allRecordMap); - List afterRevokeAssignments = Arrays.asList(tp1); AtomicInteger pollPhase = new AtomicInteger(); Consumer consumer = mock(Consumer.class); @@ -1225,9 +1228,7 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int return null; }).given(consumer).subscribe(any(Collection.class), any()); CountDownLatch pauseLatch = new CountDownLatch(1); - AtomicBoolean paused = new AtomicBoolean(); willAnswer(inv -> { - paused.set(true); pauseLatch.countDown(); return null; }).given(consumer).pause(any()); @@ -1249,17 +1250,20 @@ void pruneRevokedPartitionsFromPendingOutOfOrderCommitsCoopAssignor() throws Int Thread.sleep(50); pollLatch.countDown(); switch (pollPhase.getAndIncrement()) { - case 0: + case 0 -> { rebal.get().onPartitionsAssigned(allAssignments); return allRecords; - case 1: + } + case 1 -> { rebal.get().onPartitionsRevoked(List.of(tp0)); rebal.get().onPartitionsAssigned(List.of(new TopicPartition("foo", 2))); rebalLatch.countDown(); continueLatch.await(10, TimeUnit.SECONDS); return ConsumerRecords.empty(); - default: + } + default -> { return ConsumerRecords.empty(); + } } }).given(consumer).poll(any()); container.start(); @@ -1285,7 +1289,7 @@ private AcknowledgingMessageListener ackOffset1() { @Override public void onMessage(ConsumerRecord rec, @Nullable Acknowledgment ack) { - if (rec.offset() == 1) { + if (rec.offset() == 1 && ack != null) { ack.acknowledge(); } } @@ -1299,7 +1303,7 @@ public void onMessage(Object data) { public static class TestMessageListener1 implements MessageListener, ConsumerSeekAware { - private static ThreadLocal callbacks = new ThreadLocal<>(); + private static final ThreadLocal callbacks = new ThreadLocal<>(); CountDownLatch latch = new CountDownLatch(1); @@ -1335,7 +1339,7 @@ public void onIdleContainer(Map assignments, ConsumerSeekC public static class TestMessageListener2 implements MessageListener, ConsumerSeekAware { - private static ThreadLocal callbacks = new ThreadLocal<>(); + private static final ThreadLocal callbacks = new ThreadLocal<>(); CountDownLatch latch = new CountDownLatch(1);