diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java
index d165eb598c8..b325fa3c147 100644
--- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java
+++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2019 the original author or authors.
+ * Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -72,6 +72,21 @@ protected boolean isExpireGroupsUponCompletion() {
return this.expireGroupsUponCompletion;
}
+ /**
+ * Check an {@link Iterable} result for split possibility on the output production:
+ * the items of the collection have to be instances of {@link Message}
+ * or {@link org.springframework.integration.support.AbstractIntegrationMessageBuilder}
+ * and {@link #getOutputProcessor()} has to be a {@link SimpleMessageGroupProcessor}.
+ * Otherwise, a single reply message is emitted with the whole {@link Iterable} as its payload.
+ * @param reply the {@link Iterable} result to check for split possibility.
+ * @return true if the {@link Iterable} result has to be split into individual messages.
+ * @since 6.0
+ */
+ @Override
+ protected boolean shouldSplitOutput(Iterable> reply) {
+ return getOutputProcessor() instanceof SimpleMessageGroupProcessor && super.shouldSplitOutput(reply);
+ }
+
/**
* Complete the group and remove all its messages.
* If the {@link #expireGroupsUponCompletion} is true, then remove group fully.
diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
index 56effb430a9..97d4d632051 100644
--- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
+++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java
@@ -44,7 +44,6 @@
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy;
-import org.springframework.integration.aggregator.SimpleMessageGroupProcessor;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.Filter;
@@ -69,6 +68,7 @@
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.splitter.DefaultMessageSplitter;
+import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
@@ -178,12 +178,11 @@ public void testMessagingAnnotationsFlow() throws InterruptedException {
MessageHistory messageHistory = MessageHistory.read(message);
assertThat(messageHistory).isNotNull();
String messageHistoryString = messageHistory.toString();
- assertThat(messageHistoryString).contains("routerChannel")
- .contains("filterChannel")
- .contains("aggregatorChannel")
- .contains("splitterChannel")
- .contains("serviceChannel")
- .doesNotContain("discardChannel");
+ assertThat(messageHistoryString)
+ .contains("routerChannel", "filterChannel", "aggregatorChannel", "serviceChannel")
+ .doesNotContain("discardChannel")
+ // history header is not overridden in splitter for individual message from message group emitted before
+ .doesNotContain("splitterChannel");
}
assertThat(this.skippedServiceActivator).isNull();
@@ -248,10 +247,10 @@ public void testReactiveMessageHandler() {
this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test"));
StepVerifier.create(
- this.contextConfiguration.messageMono
- .asMono()
- .map(Message::getPayload)
- .cast(String.class))
+ this.contextConfiguration.messageMono
+ .asMono()
+ .map(Message::getPayload)
+ .cast(String.class))
.expectNext("test")
.verifyComplete();
}
@@ -332,7 +331,7 @@ public MessageChannel aggregatorChannel() {
@Bean
@ServiceActivator(inputChannel = "aggregatorChannel")
public MessageHandler aggregator() {
- AggregatingMessageHandler handler = new AggregatingMessageHandler(new SimpleMessageGroupProcessor());
+ AggregatingMessageHandler handler = new AggregatingMessageHandler(MessageGroup::getMessages);
handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("1"));
handler.setReleaseStrategy(new ExpressionEvaluatingReleaseStrategy("size() == 10"));
handler.setOutputChannelName("splitterChannel");
@@ -346,7 +345,7 @@ public MessageChannel splitterChannel() {
@Bean
public CountDownLatch reactiveCustomizerLatch() {
- return new CountDownLatch(10);
+ return new CountDownLatch(1);
}
@Bean
diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java
index 88fcb69ba8a..4a4ba76d649 100644
--- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java
+++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2021 the original author or authors.
+ * Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -57,14 +57,14 @@
* properties can be referred to by name in the query string, e.g.
*
*
* When a message payload is an instance of {@link Iterable}, a
* {@link NamedParameterJdbcOperations#batchUpdate(String, SqlParameterSource[])} is performed, where each
* {@link SqlParameterSource} instance is based on items wrapped into an internal {@link Message} implementation with
- * headers from the request message.
+ * headers from the request message. The item is wrapped only if it is not a {@link Message} already.
*
* When a {@link #preparedStatementSetter} is configured, it is applied for each item in the appropriate
* {@link JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)} function.
@@ -219,19 +219,7 @@ protected List extends Map> executeUpdateQuery(final Message
if (message.getPayload() instanceof Iterable) {
Stream extends Message>> messageStream =
StreamSupport.stream(((Iterable>) message.getPayload()).spliterator(), false)
- .map(payload -> new Message