diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java index 45d9dd85a6..b8883ad935 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java @@ -25,7 +25,6 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; -import java.util.stream.Collectors; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; @@ -121,12 +120,12 @@ public void setAlwaysClientIdSuffix(boolean alwaysClientIdSuffix) { public List> getContainers() { this.lifecycleLock.lock(); try { - return Collections.unmodifiableList(new ArrayList<>(this.containers)); + return List.copyOf(this.containers); } finally { this.lifecycleLock.unlock(); } -} + } @Override public MessageListenerContainer getContainerFor(String topic, int partition) { @@ -157,7 +156,7 @@ public Collection getAssignedPartitions() { .map(KafkaMessageListenerContainer::getAssignedPartitions) .filter(Objects::nonNull) .flatMap(Collection::stream) - .collect(Collectors.toList()); + .toList(); } finally { this.lifecycleLock.unlock(); @@ -259,7 +258,6 @@ protected void doStart() { } } - @SuppressWarnings("deprecation") private void configureChildContainer(int index, KafkaMessageListenerContainer container) { String beanName = getBeanName(); beanName = (beanName == null ? "consumer" : beanName) + "-" + index; @@ -308,13 +306,17 @@ private KafkaMessageListenerContainer constructContainer(ContainerProperti return container; } + @Nullable private TopicPartitionOffset[] partitionSubset(ContainerProperties containerProperties, int index) { TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions(); + if (topicPartitions == null) { + return null; + } if (this.concurrency == 1) { - return topicPartitions; // NOSONAR + return topicPartitions; } else { - int numPartitions = topicPartitions.length; // NOSONAR + int numPartitions = topicPartitions.length; if (numPartitions == this.concurrency) { return new TopicPartitionOffset[] { topicPartitions[index] }; } @@ -389,7 +391,7 @@ && getContainerProperties().isRestartAfterAuthExceptions() if (exec == null) { exec = new SimpleAsyncTaskExecutor(getListenerId() + ".authRestart"); } - exec.execute(() -> start()); + exec.execute(this::start); } } @@ -477,10 +479,15 @@ public boolean isPartitionPaused(TopicPartition topicPartition) { public boolean isInExpectedState() { this.lifecycleLock.lock(); try { - return (isRunning() || isStoppedNormally()) && this.containers - .stream() - .map(container -> container.isInExpectedState()) - .allMatch(bool -> Boolean.TRUE.equals(bool)); + boolean isInExpectedState = isRunning() || isStoppedNormally(); + if (isInExpectedState) { + for (KafkaMessageListenerContainer container : this.containers) { + if (!container.isInExpectedState()) { + return false; + } + } + } + return isInExpectedState; } finally { this.lifecycleLock.unlock();