Skip to content

Commit b0bb812

Browse files
committed
spring-projectsGH-3797: Improve batch processing in the framework
Fixes spring-projects#3797 * Handle `Message` items of the `Iterable` payload properly in the `JdbcMessageHandler`. Otherwise, they've been wrapped into an extra `Message` * Produce a single message with a `Collection<Message<?>>` payload in the `AggregatingMessageHandler` when the `getOutputProcessor()` is not an instance of `SimpleMessageGroupProcessor` * Mention these changes in docs * Point to the error handling sample from docs
1 parent 4f49038 commit b0bb812

File tree

8 files changed

+100
-35
lines changed

8 files changed

+100
-35
lines changed

spring-integration-core/src/main/java/org/springframework/integration/aggregator/AggregatingMessageHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,6 +72,21 @@ protected boolean isExpireGroupsUponCompletion() {
7272
return this.expireGroupsUponCompletion;
7373
}
7474

75+
/**
76+
* Check an {@link Iterable} result for split possibility on the output production:
77+
* the items of the collection have to be instances of {@link Message}
78+
* or {@link org.springframework.integration.support.AbstractIntegrationMessageBuilder}
79+
* and {@link #getOutputProcessor()} has to be a {@link SimpleMessageGroupProcessor}.
80+
* Otherwise, a single reply message is emitted with the whole {@link Iterable} as its payload.
81+
* @param reply the {@link Iterable} result to check for split possibility.
82+
* @return true if the {@link Iterable} result has to be split into individual messages.
83+
* @since 6.0
84+
*/
85+
@Override
86+
protected boolean shouldSplitOutput(Iterable<?> reply) {
87+
return getOutputProcessor() instanceof SimpleMessageGroupProcessor && super.shouldSplitOutput(reply);
88+
}
89+
7590
/**
7691
* Complete the group and remove all its messages.
7792
* If the {@link #expireGroupsUponCompletion} is true, then remove group fully.

spring-integration-core/src/test/java/org/springframework/integration/config/annotation/MessagingAnnotationsWithBeanAnnotationTests.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.springframework.integration.aggregator.AggregatingMessageHandler;
4545
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
4646
import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy;
47-
import org.springframework.integration.aggregator.SimpleMessageGroupProcessor;
4847
import org.springframework.integration.annotation.BridgeFrom;
4948
import org.springframework.integration.annotation.BridgeTo;
5049
import org.springframework.integration.annotation.Filter;
@@ -69,6 +68,7 @@
6968
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
7069
import org.springframework.integration.history.MessageHistory;
7170
import org.springframework.integration.splitter.DefaultMessageSplitter;
71+
import org.springframework.integration.store.MessageGroup;
7272
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
7373
import org.springframework.messaging.Message;
7474
import org.springframework.messaging.MessageChannel;
@@ -178,12 +178,11 @@ public void testMessagingAnnotationsFlow() throws InterruptedException {
178178
MessageHistory messageHistory = MessageHistory.read(message);
179179
assertThat(messageHistory).isNotNull();
180180
String messageHistoryString = messageHistory.toString();
181-
assertThat(messageHistoryString).contains("routerChannel")
182-
.contains("filterChannel")
183-
.contains("aggregatorChannel")
184-
.contains("splitterChannel")
185-
.contains("serviceChannel")
186-
.doesNotContain("discardChannel");
181+
assertThat(messageHistoryString)
182+
.contains("routerChannel", "filterChannel", "aggregatorChannel", "serviceChannel")
183+
.doesNotContain("discardChannel")
184+
// history header is not overridden in splitter for individual message from message group emitted before
185+
.doesNotContain("splitterChannel");
187186
}
188187

189188
assertThat(this.skippedServiceActivator).isNull();
@@ -248,10 +247,10 @@ public void testReactiveMessageHandler() {
248247
this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test"));
249248

250249
StepVerifier.create(
251-
this.contextConfiguration.messageMono
252-
.asMono()
253-
.map(Message::getPayload)
254-
.cast(String.class))
250+
this.contextConfiguration.messageMono
251+
.asMono()
252+
.map(Message::getPayload)
253+
.cast(String.class))
255254
.expectNext("test")
256255
.verifyComplete();
257256
}
@@ -332,7 +331,7 @@ public MessageChannel aggregatorChannel() {
332331
@Bean
333332
@ServiceActivator(inputChannel = "aggregatorChannel")
334333
public MessageHandler aggregator() {
335-
AggregatingMessageHandler handler = new AggregatingMessageHandler(new SimpleMessageGroupProcessor());
334+
AggregatingMessageHandler handler = new AggregatingMessageHandler(MessageGroup::getMessages);
336335
handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("1"));
337336
handler.setReleaseStrategy(new ExpressionEvaluatingReleaseStrategy("size() == 10"));
338337
handler.setOutputChannelName("splitterChannel");
@@ -346,7 +345,7 @@ public MessageChannel splitterChannel() {
346345

347346
@Bean
348347
public CountDownLatch reactiveCustomizerLatch() {
349-
return new CountDownLatch(10);
348+
return new CountDownLatch(1);
350349
}
351350

352351
@Bean

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -57,14 +57,14 @@
5757
* properties can be referred to by name in the query string, e.g.
5858
*
5959
* <pre class="code">
60-
* INSERT INTO FOOS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
60+
* INSERT INTO ITEMS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
6161
* </pre>
6262
*
6363
* <p>
6464
* When a message payload is an instance of {@link Iterable}, a
6565
* {@link NamedParameterJdbcOperations#batchUpdate(String, SqlParameterSource[])} is performed, where each
6666
* {@link SqlParameterSource} instance is based on items wrapped into an internal {@link Message} implementation with
67-
* headers from the request message.
67+
* headers from the request message. The item is wrapped only if it is not a {@link Message} already.
6868
* <p>
6969
* When a {@link #preparedStatementSetter} is configured, it is applied for each item in the appropriate
7070
* {@link JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)} function.
@@ -219,19 +219,7 @@ protected List<? extends Map<String, Object>> executeUpdateQuery(final Message<?
219219
if (message.getPayload() instanceof Iterable) {
220220
Stream<? extends Message<?>> messageStream =
221221
StreamSupport.stream(((Iterable<?>) message.getPayload()).spliterator(), false)
222-
.map(payload -> new Message<Object>() {
223-
224-
@Override
225-
public Object getPayload() {
226-
return payload;
227-
}
228-
229-
@Override
230-
public MessageHeaders getHeaders() {
231-
return message.getHeaders();
232-
}
233-
234-
});
222+
.map(payload -> payloadToMessage(payload, message.getHeaders()));
235223

236224
int[] updates;
237225

@@ -288,4 +276,24 @@ public int getBatchSize() {
288276
}
289277
}
290278

279+
private static Message<?> payloadToMessage(Object payload, MessageHeaders messageHeaders) {
280+
if (payload instanceof Message) {
281+
return (Message<?>) payload;
282+
}
283+
284+
return new Message<>() {
285+
286+
@Override
287+
public Object getPayload() {
288+
return payload;
289+
}
290+
291+
@Override
292+
public MessageHeaders getHeaders() {
293+
return messageHeaders;
294+
}
295+
296+
};
297+
}
298+
291299
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,21 @@
1717
package org.springframework.integration.jdbc;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.Mockito.mock;
2021

2122
import java.util.Arrays;
2223
import java.util.List;
2324
import java.util.Map;
2425
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.stream.IntStream;
2527

2628
import org.junit.jupiter.api.AfterAll;
2729
import org.junit.jupiter.api.AfterEach;
2830
import org.junit.jupiter.api.BeforeAll;
2931
import org.junit.jupiter.api.Disabled;
3032
import org.junit.jupiter.api.Test;
3133

34+
import org.springframework.beans.factory.BeanFactory;
3235
import org.springframework.integration.support.MessageBuilder;
3336
import org.springframework.jdbc.core.JdbcTemplate;
3437
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
@@ -108,6 +111,38 @@ public void testInsertBatch() {
108111
assertThat(foos.get(2).get("NAME")).isEqualTo("foo3");
109112
}
110113

114+
@Test
115+
public void testInsertBatchOfMessages() {
116+
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,
117+
"insert into foos (id, status, name) values (:id, 0, :payload)");
118+
ExpressionEvaluatingSqlParameterSourceFactory sqlParameterSourceFactory =
119+
new ExpressionEvaluatingSqlParameterSourceFactory();
120+
sqlParameterSourceFactory.setParameterExpressions(Map.of("id", "headers.id", "payload", "payload"));
121+
sqlParameterSourceFactory.setBeanFactory(mock(BeanFactory.class));
122+
handler.setSqlParameterSourceFactory(sqlParameterSourceFactory);
123+
handler.afterPropertiesSet();
124+
125+
List<GenericMessage<String>> payload =
126+
IntStream.range(1, 4)
127+
.mapToObj(i -> "Item" + i)
128+
.map(GenericMessage::new)
129+
.toList();
130+
131+
handler.handleMessage(new GenericMessage<>(payload));
132+
133+
List<Map<String, Object>> foos = jdbcTemplate.queryForList("SELECT * FROM FOOS ORDER BY NAME");
134+
135+
assertThat(foos.size()).isEqualTo(3);
136+
137+
assertThat(foos.get(0).get("NAME")).isEqualTo("Item1");
138+
assertThat(foos.get(1).get("NAME")).isEqualTo("Item2");
139+
assertThat(foos.get(2).get("NAME")).isEqualTo("Item3");
140+
141+
assertThat(foos.get(0).get("ID"))
142+
.isNotEqualTo(foos.get(0).get("NAME"))
143+
.isEqualTo(payload.get(0).getHeaders().getId().toString());
144+
}
145+
111146
@Test
112147
public void testInsertWithMessagePreparedStatementSetter() {
113148
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,

src/reference/asciidoc/aggregator.adoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,8 +132,6 @@ Consequently, the `Collection<Message>` variable in the POJO is cleared too, if
132132
If you wish to simply release that collection as-is for further processing, you must build a new `Collection` (for example, `new ArrayList<Message>(messages)`).
133133
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation.
134134

135-
If the `processMessageGroup` method of the `MessageGroupProcessor` returns a collection, it must be a collection of `Message<?>` objects.
136-
In this case, the messages are individually released.
137135
Prior to version 4.2, it was not possible to provide a `MessageGroupProcessor` by using XML configuration.
138136
Only POJO methods could be used for aggregation.
139137
Now, if the framework detects that the referenced (or inner) bean implements `MessageProcessor`, it is used as the aggregator's output processor.
@@ -145,6 +143,10 @@ It returns the collection of messages from the group, which, as indicated earlie
145143

146144
This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages.
147145

146+
Starting with version 6.0, a splitting behaviour, described above, works only for a mentioned `SimpleMessageGroupProcessor`.
147+
Otherwise, with any other `MessageGroupProcessor` implementation which returns a `Collection<Message>`, only a single reply message is emitted with the whole collection of messages as its payload.
148+
Such a logic is dictated with a canonical purpose of an aggregator - collect request messages by some key and produce a single grouped message.
149+
148150
===== `ReleaseStrategy`
149151

150152
The `ReleaseStrategy` interface is defined as follows:

src/reference/asciidoc/error-handling.adoc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ It is recommended to use that `MessagePublishingErrorHandler` for any custom `ta
6262
A registered `integrationMessagePublishingErrorHandler` bean can be used in this case.
6363

6464
To enable global error handling, register a handler on that channel.
65-
For example, you can configure Spring Integration's `ErrorMessageExceptionTypeRouter` as the handler of an endpoint that is subscribed to the 'errorChannel'.
65+
For example, you can configure Spring Integration's `ErrorMessageExceptionTypeRouter` as the handler of an endpoint that is subscribed to the `errorChannel`.
6666
That router can then spread the error messages across multiple channels, based on the `Exception` type.
6767

6868
Starting with version 4.3.10, Spring Integration provides the `ErrorMessagePublisher` and the `ErrorMessageStrategy`.
@@ -85,5 +85,7 @@ Including a resource and source of the bean definition helps to determine possib
8585

8686
Starting with version 5.4.3, the default error channel is configured with the property `requireSubscribers = true` to not silently ignore messages when there are no subscribers on this channel (e.g. when application context is stopped).
8787
In this case a `MessageDispatchingException` is thrown which may lend on the client callback of the inbound channel adapter to negatively acknowledge (or roll back) an original message in the source system for redelivery or other future consideration.
88-
To restore the previous behavior (ignore non dispatched error messages), the global integration property `spring.integration.channels.error.requireSubscribers` must be set to `false`.
88+
To restore the previous behavior (ignore non-dispatched error messages), the global integration property `spring.integration.channels.error.requireSubscribers` must be set to `false`.
8989
See <<./configuration.adoc#global-properties,Global Properties>> and <<./channel.adoc#channel-configuration-pubsubchannel,`PublishSubscribeChannel` Configuration>> (if you configure a global `errorChannel` manually) for more information.
90+
91+
See also https://github.com/spring-projects/spring-integration-samples/tree/main/intermediate/errorhandling[Error Handling Sample] for more information.

src/reference/asciidoc/jdbc.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ It lets you specify a `MessagePreparedStatementSetter` bean reference.
284284
==== Batch Update
285285

286286
Starting with version 5.1, the `JdbcMessageHandler` performs a `JdbcOperations.batchUpdate()` if the payload of the request message is an `Iterable` instance.
287-
Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message.
287+
Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message if such an element is not a `Message` already.
288288
In the case of regular `SqlParameterSourceFactory`-based configuration these messages are used to build an `SqlParameterSource[]` for an argument used in the mentioned `JdbcOperations.batchUpdate()` function.
289289
When a `MessagePreparedStatementSetter` configuration is applied, a `BatchPreparedStatementSetter` variant is used to iterate over those messages for each item and the provided `MessagePreparedStatementSetter` is called against them.
290290
The batch update is not supported when `keysGenerated` mode is selected.

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@ For convenience, the XML and Java DSL for Scatter-Gather, based on the `Recipien
4343

4444
See <<./scatter-gather.adoc#scatter-gather,Scatter-Gather>> for more information.
4545

46+
The `AggregatingMessageHandler` now does not split the `Collection<Message<?>>` result of the `MessageGroupProcessor` (excluding `SimpleMessageGroupProcessor`) on the output, but emits a single message containing this whole collection as a payload.
47+
48+
See <<./aggregator.adoc#aggregator,Aggregator>> for more information.
49+
4650
[[x6.0-http]]
4751
=== HTTP Changes
4852

0 commit comments

Comments
 (0)