diff --git a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java index d165eb598c8..b325fa3c147 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -72,6 +72,21 @@ protected boolean isExpireGroupsUponCompletion() { return this.expireGroupsUponCompletion; } + /** + * Check an {@link Iterable} result for split possibility on the output production: + * the items of the collection have to be instances of {@link Message} + * or {@link org.springframework.integration.support.AbstractIntegrationMessageBuilder} + * and {@link #getOutputProcessor()} has to be a {@link SimpleMessageGroupProcessor}. + * Otherwise, a single reply message is emitted with the whole {@link Iterable} as its payload. + * @param reply the {@link Iterable} result to check for split possibility. + * @return true if the {@link Iterable} result has to be split into individual messages. + * @since 6.0 + */ + @Override + protected boolean shouldSplitOutput(Iterable reply) { + return getOutputProcessor() instanceof SimpleMessageGroupProcessor && super.shouldSplitOutput(reply); + } + /** * Complete the group and remove all its messages. * If the {@link #expireGroupsUponCompletion} is true, then remove group fully. diff --git a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java index 56effb430a9..97d4d632051 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java @@ -44,7 +44,6 @@ import org.springframework.integration.aggregator.AggregatingMessageHandler; import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy; import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy; -import org.springframework.integration.aggregator.SimpleMessageGroupProcessor; import org.springframework.integration.annotation.BridgeFrom; import org.springframework.integration.annotation.BridgeTo; import org.springframework.integration.annotation.Filter; @@ -69,6 +68,7 @@ import org.springframework.integration.filter.ExpressionEvaluatingSelector; import org.springframework.integration.history.MessageHistory; import org.springframework.integration.splitter.DefaultMessageSplitter; +import org.springframework.integration.store.MessageGroup; import org.springframework.integration.transformer.ExpressionEvaluatingTransformer; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -178,12 +178,11 @@ public void testMessagingAnnotationsFlow() throws InterruptedException { MessageHistory messageHistory = MessageHistory.read(message); assertThat(messageHistory).isNotNull(); String messageHistoryString = messageHistory.toString(); - assertThat(messageHistoryString).contains("routerChannel") - .contains("filterChannel") - .contains("aggregatorChannel") - .contains("splitterChannel") - .contains("serviceChannel") - .doesNotContain("discardChannel"); + assertThat(messageHistoryString) + .contains("routerChannel", "filterChannel", "aggregatorChannel", "serviceChannel") + .doesNotContain("discardChannel") + // history header is not overridden in splitter for individual message from message group emitted before + .doesNotContain("splitterChannel"); } assertThat(this.skippedServiceActivator).isNull(); @@ -248,10 +247,10 @@ public void testReactiveMessageHandler() { this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test")); StepVerifier.create( - this.contextConfiguration.messageMono - .asMono() - .map(Message::getPayload) - .cast(String.class)) + this.contextConfiguration.messageMono + .asMono() + .map(Message::getPayload) + .cast(String.class)) .expectNext("test") .verifyComplete(); } @@ -332,7 +331,7 @@ public MessageChannel aggregatorChannel() { @Bean @ServiceActivator(inputChannel = "aggregatorChannel") public MessageHandler aggregator() { - AggregatingMessageHandler handler = new AggregatingMessageHandler(new SimpleMessageGroupProcessor()); + AggregatingMessageHandler handler = new AggregatingMessageHandler(MessageGroup::getMessages); handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("1")); handler.setReleaseStrategy(new ExpressionEvaluatingReleaseStrategy("size() == 10")); handler.setOutputChannelName("splitterChannel"); @@ -346,7 +345,7 @@ public MessageChannel splitterChannel() { @Bean public CountDownLatch reactiveCustomizerLatch() { - return new CountDownLatch(10); + return new CountDownLatch(1); } @Bean diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java index 88fcb69ba8a..4a4ba76d649 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2021 the original author or authors. + * Copyright 2002-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -57,14 +57,14 @@ * properties can be referred to by name in the query string, e.g. * *
- * INSERT INTO FOOS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
+ * INSERT INTO ITEMS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
  * 
* *

* When a message payload is an instance of {@link Iterable}, a * {@link NamedParameterJdbcOperations#batchUpdate(String, SqlParameterSource[])} is performed, where each * {@link SqlParameterSource} instance is based on items wrapped into an internal {@link Message} implementation with - * headers from the request message. + * headers from the request message. The item is wrapped only if it is not a {@link Message} already. *

* When a {@link #preparedStatementSetter} is configured, it is applied for each item in the appropriate * {@link JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)} function. @@ -219,19 +219,7 @@ protected List> executeUpdateQuery(final Message> messageStream = StreamSupport.stream(((Iterable) message.getPayload()).spliterator(), false) - .map(payload -> new Message() { - - @Override - public Object getPayload() { - return payload; - } - - @Override - public MessageHeaders getHeaders() { - return message.getHeaders(); - } - - }); + .map(payload -> payloadToMessage(payload, message.getHeaders())); int[] updates; @@ -288,4 +276,24 @@ public int getBatchSize() { } } + private static Message payloadToMessage(Object payload, MessageHeaders messageHeaders) { + if (payload instanceof Message) { + return (Message) payload; + } + + return new Message<>() { + + @Override + public Object getPayload() { + return payload; + } + + @Override + public MessageHeaders getHeaders() { + return messageHeaders; + } + + }; + } + } diff --git a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java index 53c9caaf9a8..29ff4b5fbc5 100644 --- a/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java +++ b/spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java @@ -17,11 +17,13 @@ package org.springframework.integration.jdbc; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -29,6 +31,7 @@ import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.BeanFactory; import org.springframework.integration.support.MessageBuilder; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase; @@ -108,6 +111,38 @@ public void testInsertBatch() { assertThat(foos.get(2).get("NAME")).isEqualTo("foo3"); } + @Test + public void testInsertBatchOfMessages() { + JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate, + "insert into foos (id, status, name) values (:id, 0, :payload)"); + ExpressionEvaluatingSqlParameterSourceFactory sqlParameterSourceFactory = + new ExpressionEvaluatingSqlParameterSourceFactory(); + sqlParameterSourceFactory.setParameterExpressions(Map.of("id", "headers.id", "payload", "payload")); + sqlParameterSourceFactory.setBeanFactory(mock(BeanFactory.class)); + handler.setSqlParameterSourceFactory(sqlParameterSourceFactory); + handler.afterPropertiesSet(); + + List> payload = + IntStream.range(1, 4) + .mapToObj(i -> "Item" + i) + .map(GenericMessage::new) + .toList(); + + handler.handleMessage(new GenericMessage<>(payload)); + + List> foos = jdbcTemplate.queryForList("SELECT * FROM FOOS ORDER BY NAME"); + + assertThat(foos.size()).isEqualTo(3); + + assertThat(foos.get(0).get("NAME")).isEqualTo("Item1"); + assertThat(foos.get(1).get("NAME")).isEqualTo("Item2"); + assertThat(foos.get(2).get("NAME")).isEqualTo("Item3"); + + assertThat(foos.get(0).get("ID")) + .isNotEqualTo(foos.get(0).get("NAME")) + .isEqualTo(payload.get(0).getHeaders().getId().toString()); + } + @Test public void testInsertWithMessagePreparedStatementSetter() { JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate, diff --git a/src/reference/asciidoc/aggregator.adoc b/src/reference/asciidoc/aggregator.adoc index fbe52349207..3b46abc2c18 100644 --- a/src/reference/asciidoc/aggregator.adoc +++ b/src/reference/asciidoc/aggregator.adoc @@ -132,8 +132,6 @@ Consequently, the `Collection` variable in the POJO is cleared too, if If you wish to simply release that collection as-is for further processing, you must build a new `Collection` (for example, `new ArrayList(messages)`). Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation. -If the `processMessageGroup` method of the `MessageGroupProcessor` returns a collection, it must be a collection of `Message` objects. -In this case, the messages are individually released. Prior to version 4.2, it was not possible to provide a `MessageGroupProcessor` by using XML configuration. Only POJO methods could be used for aggregation. Now, if the framework detects that the referenced (or inner) bean implements `MessageProcessor`, it is used as the aggregator's output processor. @@ -145,6 +143,10 @@ It returns the collection of messages from the group, which, as indicated earlie This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages. +Starting with version 6.0, the splitting behaviour, described above, works only if the group processor is a `SimpleMessageGroupProcessor`. +Otherwise, with any other `MessageGroupProcessor` implementation that returns a `Collection`, only a single reply message is emitted with the whole collection of messages as its payload. +Such logic is dictated by the canonical purpose of an aggregator - collect request messages by some key and produce a single grouped message. + ===== `ReleaseStrategy` The `ReleaseStrategy` interface is defined as follows: diff --git a/src/reference/asciidoc/error-handling.adoc b/src/reference/asciidoc/error-handling.adoc index 9cffedebd17..25530946288 100644 --- a/src/reference/asciidoc/error-handling.adoc +++ b/src/reference/asciidoc/error-handling.adoc @@ -62,7 +62,7 @@ It is recommended to use that `MessagePublishingErrorHandler` for any custom `ta A registered `integrationMessagePublishingErrorHandler` bean can be used in this case. To enable global error handling, register a handler on that channel. -For example, you can configure Spring Integration's `ErrorMessageExceptionTypeRouter` as the handler of an endpoint that is subscribed to the 'errorChannel'. +For example, you can configure Spring Integration's `ErrorMessageExceptionTypeRouter` as the handler of an endpoint that is subscribed to the `errorChannel`. That router can then spread the error messages across multiple channels, based on the `Exception` type. Starting with version 4.3.10, Spring Integration provides the `ErrorMessagePublisher` and the `ErrorMessageStrategy`. @@ -85,5 +85,7 @@ Including a resource and source of the bean definition helps to determine possib Starting with version 5.4.3, the default error channel is configured with the property `requireSubscribers = true` to not silently ignore messages when there are no subscribers on this channel (e.g. when application context is stopped). In this case a `MessageDispatchingException` is thrown which may lend on the client callback of the inbound channel adapter to negatively acknowledge (or roll back) an original message in the source system for redelivery or other future consideration. -To restore the previous behavior (ignore non dispatched error messages), the global integration property `spring.integration.channels.error.requireSubscribers` must be set to `false`. +To restore the previous behavior (ignore non-dispatched error messages), the global integration property `spring.integration.channels.error.requireSubscribers` must be set to `false`. See <<./configuration.adoc#global-properties,Global Properties>> and <<./channel.adoc#channel-configuration-pubsubchannel,`PublishSubscribeChannel` Configuration>> (if you configure a global `errorChannel` manually) for more information. + +See also https://github.com/spring-projects/spring-integration-samples/tree/main/intermediate/errorhandling[Error Handling Sample] for more information. diff --git a/src/reference/asciidoc/jdbc.adoc b/src/reference/asciidoc/jdbc.adoc index 14fd5ce2959..90abcab395d 100644 --- a/src/reference/asciidoc/jdbc.adoc +++ b/src/reference/asciidoc/jdbc.adoc @@ -284,7 +284,7 @@ It lets you specify a `MessagePreparedStatementSetter` bean reference. ==== Batch Update Starting with version 5.1, the `JdbcMessageHandler` performs a `JdbcOperations.batchUpdate()` if the payload of the request message is an `Iterable` instance. -Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message. +Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message if such an element is not a `Message` already. In the case of regular `SqlParameterSourceFactory`-based configuration these messages are used to build an `SqlParameterSource[]` for an argument used in the mentioned `JdbcOperations.batchUpdate()` function. When a `MessagePreparedStatementSetter` configuration is applied, a `BatchPreparedStatementSetter` variant is used to iterate over those messages for each item and the provided `MessagePreparedStatementSetter` is called against them. The batch update is not supported when `keysGenerated` mode is selected. diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index a0f1cee4694..5ede329d78a 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -43,6 +43,10 @@ For convenience, the XML and Java DSL for Scatter-Gather, based on the `Recipien See <<./scatter-gather.adoc#scatter-gather,Scatter-Gather>> for more information. +The `AggregatingMessageHandler` now does not split a `Collection>` result of the `MessageGroupProcessor` (unless it is a `SimpleMessageGroupProcessor`) on the output, but emits a single message containing this whole collection as a payload. + +See <<./aggregator.adoc#aggregator,Aggregator>> for more information. + [[x6.0-http]] === HTTP Changes