Skip to content

GH-3797: Improve batch processing in the framework #3820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -72,6 +72,21 @@ protected boolean isExpireGroupsUponCompletion() {
return this.expireGroupsUponCompletion;
}

/**
* Check an {@link Iterable} result for split possibility on the output production:
* the items of the collection have to be instances of {@link Message}
* or {@link org.springframework.integration.support.AbstractIntegrationMessageBuilder}
* and {@link #getOutputProcessor()} has to be a {@link SimpleMessageGroupProcessor}.
* Otherwise, a single reply message is emitted with the whole {@link Iterable} as its payload.
* @param reply the {@link Iterable} result to check for split possibility.
* @return true if the {@link Iterable} result has to be split into individual messages.
* @since 6.0
*/
@Override
protected boolean shouldSplitOutput(Iterable<?> reply) {
return getOutputProcessor() instanceof SimpleMessageGroupProcessor && super.shouldSplitOutput(reply);
}

/**
* Complete the group and remove all its messages.
* If the {@link #expireGroupsUponCompletion} is true, then remove group fully.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.springframework.integration.aggregator.AggregatingMessageHandler;
import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
import org.springframework.integration.aggregator.ExpressionEvaluatingReleaseStrategy;
import org.springframework.integration.aggregator.SimpleMessageGroupProcessor;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.annotation.BridgeTo;
import org.springframework.integration.annotation.Filter;
Expand All @@ -69,6 +68,7 @@
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.integration.history.MessageHistory;
import org.springframework.integration.splitter.DefaultMessageSplitter;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.transformer.ExpressionEvaluatingTransformer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
Expand Down Expand Up @@ -178,12 +178,11 @@ public void testMessagingAnnotationsFlow() throws InterruptedException {
MessageHistory messageHistory = MessageHistory.read(message);
assertThat(messageHistory).isNotNull();
String messageHistoryString = messageHistory.toString();
assertThat(messageHistoryString).contains("routerChannel")
.contains("filterChannel")
.contains("aggregatorChannel")
.contains("splitterChannel")
.contains("serviceChannel")
.doesNotContain("discardChannel");
assertThat(messageHistoryString)
.contains("routerChannel", "filterChannel", "aggregatorChannel", "serviceChannel")
.doesNotContain("discardChannel")
// history header is not overridden in splitter for individual message from message group emitted before
.doesNotContain("splitterChannel");
}

assertThat(this.skippedServiceActivator).isNull();
Expand Down Expand Up @@ -248,10 +247,10 @@ public void testReactiveMessageHandler() {
this.reactiveMessageHandlerChannel.send(new GenericMessage<>("test"));

StepVerifier.create(
this.contextConfiguration.messageMono
.asMono()
.map(Message::getPayload)
.cast(String.class))
this.contextConfiguration.messageMono
.asMono()
.map(Message::getPayload)
.cast(String.class))
.expectNext("test")
.verifyComplete();
}
Expand Down Expand Up @@ -332,7 +331,7 @@ public MessageChannel aggregatorChannel() {
@Bean
@ServiceActivator(inputChannel = "aggregatorChannel")
public MessageHandler aggregator() {
AggregatingMessageHandler handler = new AggregatingMessageHandler(new SimpleMessageGroupProcessor());
AggregatingMessageHandler handler = new AggregatingMessageHandler(MessageGroup::getMessages);
handler.setCorrelationStrategy(new ExpressionEvaluatingCorrelationStrategy("1"));
handler.setReleaseStrategy(new ExpressionEvaluatingReleaseStrategy("size() == 10"));
handler.setOutputChannelName("splitterChannel");
Expand All @@ -346,7 +345,7 @@ public MessageChannel splitterChannel() {

@Bean
public CountDownLatch reactiveCustomizerLatch() {
return new CountDownLatch(10);
return new CountDownLatch(1);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,14 +57,14 @@
* properties can be referred to by name in the query string, e.g.
*
* <pre class="code">
* INSERT INTO FOOS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
* INSERT INTO ITEMS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
* </pre>
*
* <p>
* When a message payload is an instance of {@link Iterable}, a
* {@link NamedParameterJdbcOperations#batchUpdate(String, SqlParameterSource[])} is performed, where each
* {@link SqlParameterSource} instance is based on items wrapped into an internal {@link Message} implementation with
* headers from the request message.
* headers from the request message. The item is wrapped only if it is not a {@link Message} already.
* <p>
* When a {@link #preparedStatementSetter} is configured, it is applied for each item in the appropriate
* {@link JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)} function.
Expand Down Expand Up @@ -219,19 +219,7 @@ protected List<? extends Map<String, Object>> executeUpdateQuery(final Message<?
if (message.getPayload() instanceof Iterable) {
Stream<? extends Message<?>> messageStream =
StreamSupport.stream(((Iterable<?>) message.getPayload()).spliterator(), false)
.map(payload -> new Message<Object>() {

@Override
public Object getPayload() {
return payload;
}

@Override
public MessageHeaders getHeaders() {
return message.getHeaders();
}

});
.map(payload -> payloadToMessage(payload, message.getHeaders()));

int[] updates;

Expand Down Expand Up @@ -288,4 +276,24 @@ public int getBatchSize() {
}
}

private static Message<?> payloadToMessage(Object payload, MessageHeaders messageHeaders) {
if (payload instanceof Message) {
return (Message<?>) payload;
}

return new Message<>() {

@Override
public Object getPayload() {
return payload;
}

@Override
public MessageHeaders getHeaders() {
return messageHeaders;
}

};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@
package org.springframework.integration.jdbc;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabase;
Expand Down Expand Up @@ -108,6 +111,38 @@ public void testInsertBatch() {
assertThat(foos.get(2).get("NAME")).isEqualTo("foo3");
}

@Test
public void testInsertBatchOfMessages() {
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,
"insert into foos (id, status, name) values (:id, 0, :payload)");
ExpressionEvaluatingSqlParameterSourceFactory sqlParameterSourceFactory =
new ExpressionEvaluatingSqlParameterSourceFactory();
sqlParameterSourceFactory.setParameterExpressions(Map.of("id", "headers.id", "payload", "payload"));
sqlParameterSourceFactory.setBeanFactory(mock(BeanFactory.class));
handler.setSqlParameterSourceFactory(sqlParameterSourceFactory);
handler.afterPropertiesSet();

List<GenericMessage<String>> payload =
IntStream.range(1, 4)
.mapToObj(i -> "Item" + i)
.map(GenericMessage::new)
.toList();

handler.handleMessage(new GenericMessage<>(payload));

List<Map<String, Object>> foos = jdbcTemplate.queryForList("SELECT * FROM FOOS ORDER BY NAME");

assertThat(foos.size()).isEqualTo(3);

assertThat(foos.get(0).get("NAME")).isEqualTo("Item1");
assertThat(foos.get(1).get("NAME")).isEqualTo("Item2");
assertThat(foos.get(2).get("NAME")).isEqualTo("Item3");

assertThat(foos.get(0).get("ID"))
.isNotEqualTo(foos.get(0).get("NAME"))
.isEqualTo(payload.get(0).getHeaders().getId().toString());
}

@Test
public void testInsertWithMessagePreparedStatementSetter() {
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,
Expand Down
6 changes: 4 additions & 2 deletions src/reference/asciidoc/aggregator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ Consequently, the `Collection<Message>` variable in the POJO is cleared too, if
If you wish to simply release that collection as-is for further processing, you must build a new `Collection` (for example, `new ArrayList<Message>(messages)`).
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation.

If the `processMessageGroup` method of the `MessageGroupProcessor` returns a collection, it must be a collection of `Message<?>` objects.
In this case, the messages are individually released.
Prior to version 4.2, it was not possible to provide a `MessageGroupProcessor` by using XML configuration.
Only POJO methods could be used for aggregation.
Now, if the framework detects that the referenced (or inner) bean implements `MessageProcessor`, it is used as the aggregator's output processor.
Expand All @@ -145,6 +143,10 @@ It returns the collection of messages from the group, which, as indicated earlie

This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages.

Starting with version 6.0, the splitting behaviour, described above, works only if the group processor is a `SimpleMessageGroupProcessor`.
Otherwise, with any other `MessageGroupProcessor` implementation that returns a `Collection<Message>`, only a single reply message is emitted with the whole collection of messages as its payload.
Such logic is dictated by the canonical purpose of an aggregator - collect request messages by some key and produce a single grouped message.

===== `ReleaseStrategy`

The `ReleaseStrategy` interface is defined as follows:
Expand Down
6 changes: 4 additions & 2 deletions src/reference/asciidoc/error-handling.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ It is recommended to use that `MessagePublishingErrorHandler` for any custom `ta
A registered `integrationMessagePublishingErrorHandler` bean can be used in this case.

To enable global error handling, register a handler on that channel.
For example, you can configure Spring Integration's `ErrorMessageExceptionTypeRouter` as the handler of an endpoint that is subscribed to the 'errorChannel'.
For example, you can configure Spring Integration's `ErrorMessageExceptionTypeRouter` as the handler of an endpoint that is subscribed to the `errorChannel`.
That router can then spread the error messages across multiple channels, based on the `Exception` type.

Starting with version 4.3.10, Spring Integration provides the `ErrorMessagePublisher` and the `ErrorMessageStrategy`.
Expand All @@ -85,5 +85,7 @@ Including a resource and source of the bean definition helps to determine possib

Starting with version 5.4.3, the default error channel is configured with the property `requireSubscribers = true` to not silently ignore messages when there are no subscribers on this channel (e.g. when application context is stopped).
In this case a `MessageDispatchingException` is thrown which may lend on the client callback of the inbound channel adapter to negatively acknowledge (or roll back) an original message in the source system for redelivery or other future consideration.
To restore the previous behavior (ignore non dispatched error messages), the global integration property `spring.integration.channels.error.requireSubscribers` must be set to `false`.
To restore the previous behavior (ignore non-dispatched error messages), the global integration property `spring.integration.channels.error.requireSubscribers` must be set to `false`.
See <<./configuration.adoc#global-properties,Global Properties>> and <<./channel.adoc#channel-configuration-pubsubchannel,`PublishSubscribeChannel` Configuration>> (if you configure a global `errorChannel` manually) for more information.

See also https://github.com/spring-projects/spring-integration-samples/tree/main/intermediate/errorhandling[Error Handling Sample] for more information.
2 changes: 1 addition & 1 deletion src/reference/asciidoc/jdbc.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ It lets you specify a `MessagePreparedStatementSetter` bean reference.
==== Batch Update

Starting with version 5.1, the `JdbcMessageHandler` performs a `JdbcOperations.batchUpdate()` if the payload of the request message is an `Iterable` instance.
Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message.
Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message if such an element is not a `Message` already.
In the case of regular `SqlParameterSourceFactory`-based configuration these messages are used to build an `SqlParameterSource[]` for an argument used in the mentioned `JdbcOperations.batchUpdate()` function.
When a `MessagePreparedStatementSetter` configuration is applied, a `BatchPreparedStatementSetter` variant is used to iterate over those messages for each item and the provided `MessagePreparedStatementSetter` is called against them.
The batch update is not supported when `keysGenerated` mode is selected.
Expand Down
4 changes: 4 additions & 0 deletions src/reference/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ For convenience, the XML and Java DSL for Scatter-Gather, based on the `Recipien

See <<./scatter-gather.adoc#scatter-gather,Scatter-Gather>> for more information.

The `AggregatingMessageHandler` now does not split a `Collection<Message<?>>` result of the `MessageGroupProcessor` (unless it is a `SimpleMessageGroupProcessor`) on the output, but emits a single message containing this whole collection as a payload.

See <<./aggregator.adoc#aggregator,Aggregator>> for more information.

[[x6.0-http]]
=== HTTP Changes

Expand Down