Skip to content

Commit 4a85849

Browse files
artembilangaryrussell
authored andcommitted
INT-3364: Add batchUpdate into JdbcMessageHandler (#2534)
* INT-3364: Add batchUpdate into JdbcMessageHandler JIRA: https://jira.spring.io/browse/INT-3364 * * Optimize items mapping with an internal `Message` implementation * Polishing Docs and Javadocs
1 parent 34ac66d commit 4a85849

File tree

4 files changed

+174
-48
lines changed

4 files changed

+174
-48
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/JdbcMessageHandler.java

Lines changed: 120 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,70 +18,86 @@
1818

1919
import java.sql.PreparedStatement;
2020
import java.sql.ResultSet;
21+
import java.sql.SQLException;
2122
import java.sql.Statement;
23+
import java.util.Arrays;
2224
import java.util.Collections;
2325
import java.util.LinkedList;
2426
import java.util.List;
2527
import java.util.Map;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.Stream;
30+
import java.util.stream.StreamSupport;
2631

2732
import javax.sql.DataSource;
2833

2934
import org.springframework.integration.handler.AbstractMessageHandler;
35+
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
3036
import org.springframework.jdbc.core.ColumnMapRowMapper;
3137
import org.springframework.jdbc.core.JdbcOperations;
32-
import org.springframework.jdbc.core.PreparedStatementCallback;
3338
import org.springframework.jdbc.core.PreparedStatementCreator;
3439
import org.springframework.jdbc.core.ResultSetExtractor;
3540
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
36-
import org.springframework.jdbc.core.namedparam.EmptySqlParameterSource;
3741
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
3842
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
3943
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
4044
import org.springframework.jdbc.support.GeneratedKeyHolder;
4145
import org.springframework.jdbc.support.JdbcUtils;
4246
import org.springframework.jdbc.support.KeyHolder;
4347
import org.springframework.messaging.Message;
48+
import org.springframework.messaging.MessageHeaders;
4449
import org.springframework.util.Assert;
4550
import org.springframework.util.LinkedCaseInsensitiveMap;
4651

4752
/**
4853
* A message handler that executes an SQL update. Dynamic query parameters are supported through the
4954
* {@link SqlParameterSourceFactory} abstraction, the default implementation of which wraps the message so that its bean
50-
* properties can be referred to by name in the query string E.g.
55+
* properties can be referred to by name in the query string, e.g.
5156
*
5257
* <pre class="code">
5358
* INSERT INTO FOOS (MESSAGE_ID, PAYLOAD) VALUES (:headers[id], :payload)
5459
* </pre>
5560
*
61+
* <p>
62+
* When a message payload is an instance of {@link Iterable}, a
63+
* {@link NamedParameterJdbcOperations#batchUpdate(String, SqlParameterSource[])} is performed, where each
64+
* {@link SqlParameterSource} instance is based on items wrapped into an internal {@link Message} implementation with
65+
* headers from the request message.
66+
* <p>
67+
* When a {@link #preparedStatementSetter} is configured, it is applied for each item in the appropriate
68+
* {@link JdbcOperations#batchUpdate(String, BatchPreparedStatementSetter)} function.
69+
* <p>
70+
* NOTE: The batch update is not supported when {@link #keysGenerated} is in use.
71+
*
5672
* N.B. do not use quotes to escape the header keys. The default SQL parameter source (from Spring JDBC) can also handle
5773
* headers with dotted names (e.g. <code>business.id</code>)
5874
*
5975
* @author Dave Syer
6076
* @author Artem Bilan
77+
*
6178
* @since 2.0
6279
*/
6380
public class JdbcMessageHandler extends AbstractMessageHandler {
6481

6582
private final ResultSetExtractor<List<Map<String, Object>>> generatedKeysResultSetExtractor =
66-
new RowMapperResultSetExtractor<Map<String, Object>>(new ColumnMapRowMapper(), 1);
83+
new RowMapperResultSetExtractor<>(new ColumnMapRowMapper(), 1);
6784

6885
private final NamedParameterJdbcOperations jdbcOperations;
6986

70-
private final PreparedStatementCreator generatedKeysStatementCreator = con ->
71-
con.prepareStatement(JdbcMessageHandler.this.updateSql, Statement.RETURN_GENERATED_KEYS);
87+
private final PreparedStatementCreator generatedKeysStatementCreator =
88+
con -> con.prepareStatement(JdbcMessageHandler.this.updateSql, Statement.RETURN_GENERATED_KEYS);
7289

73-
private volatile String updateSql;
90+
private String updateSql;
7491

75-
private volatile SqlParameterSourceFactory sqlParameterSourceFactory;
92+
private SqlParameterSourceFactory sqlParameterSourceFactory;
7693

77-
private volatile boolean keysGenerated;
94+
private boolean keysGenerated;
7895

7996
private MessagePreparedStatementSetter preparedStatementSetter;
8097

8198
/**
8299
* Constructor taking {@link DataSource} from which the DB Connection can be obtained and the select query to
83100
* execute to retrieve new rows.
84-
*
85101
* @param dataSource Must not be null
86102
* @param updateSql query to execute
87103
*/
@@ -93,13 +109,12 @@ public JdbcMessageHandler(DataSource dataSource, String updateSql) {
93109
/**
94110
* Constructor taking {@link JdbcOperations} instance to use for query execution and the select query to execute to
95111
* retrieve new rows.
96-
*
97112
* @param jdbcOperations instance to use for query execution
98113
* @param updateSql query to execute
99114
*/
100115
public JdbcMessageHandler(JdbcOperations jdbcOperations, String updateSql) {
101116
this.jdbcOperations = new NamedParameterJdbcTemplate(jdbcOperations);
102-
this.updateSql = updateSql;
117+
setUpdateSql(updateSql);
103118
}
104119

105120
/**
@@ -111,7 +126,12 @@ public void setKeysGenerated(boolean keysGenerated) {
111126
this.keysGenerated = keysGenerated;
112127
}
113128

114-
public void setUpdateSql(String updateSql) {
129+
/**
130+
* Configure an SQL statement to perform an UPDATE on the target database.
131+
* @param updateSql the SQL statement to perform.
132+
*/
133+
public final void setUpdateSql(String updateSql) {
134+
Assert.hasText(updateSql, "'updateSql' must not be empty.");
115135
this.updateSql = updateSql;
116136
}
117137

@@ -149,57 +169,111 @@ protected void onInit() throws Exception {
149169
* Executes the update, passing the message into the {@link SqlParameterSourceFactory}.
150170
*/
151171
@Override
152-
protected void handleMessageInternal(Message<?> message) throws Exception {
172+
protected void handleMessageInternal(Message<?> message) {
153173
List<? extends Map<String, Object>> keys = executeUpdateQuery(message, this.keysGenerated);
154174
if (!keys.isEmpty() && logger.isDebugEnabled()) {
155175
logger.debug("Generated keys: " + keys);
156176
}
157177
}
158178

159179
protected List<? extends Map<String, Object>> executeUpdateQuery(final Message<?> message, boolean keysGenerated) {
160-
SqlParameterSource updateParameterSource = EmptySqlParameterSource.INSTANCE;
161-
if (this.preparedStatementSetter == null) {
162-
if (this.sqlParameterSourceFactory != null) {
163-
updateParameterSource = this.sqlParameterSourceFactory.createParameterSource(message);
164-
}
165-
}
166180
if (keysGenerated) {
167181
if (this.preparedStatementSetter != null) {
168-
return this.jdbcOperations.getJdbcOperations().execute(this.generatedKeysStatementCreator,
169-
(PreparedStatementCallback<List<Map<String, Object>>>) ps -> {
170-
JdbcMessageHandler.this.preparedStatementSetter.setValues(ps, message);
171-
ps.executeUpdate();
172-
ResultSet keys = ps.getGeneratedKeys(); // NOSONAR closed in JdbcUtils
173-
if (keys != null) {
174-
try {
175-
176-
return JdbcMessageHandler.this.generatedKeysResultSetExtractor.extractData(keys);
177-
}
178-
finally {
179-
JdbcUtils.closeResultSet(keys);
180-
}
181-
}
182-
return new LinkedList<Map<String, Object>>();
183-
});
182+
return this.jdbcOperations.getJdbcOperations()
183+
.execute(this.generatedKeysStatementCreator,
184+
ps -> {
185+
this.preparedStatementSetter.setValues(ps, message);
186+
ps.executeUpdate();
187+
ResultSet keys = ps.getGeneratedKeys(); // NOSONAR closed in JdbcUtils
188+
if (keys != null) {
189+
try {
190+
191+
return this.generatedKeysResultSetExtractor.extractData(keys);
192+
}
193+
finally {
194+
JdbcUtils.closeResultSet(keys);
195+
}
196+
}
197+
return new LinkedList<>();
198+
});
184199
}
185200
else {
186201
KeyHolder keyHolder = new GeneratedKeyHolder();
187-
this.jdbcOperations.update(this.updateSql, updateParameterSource, keyHolder);
202+
this.jdbcOperations.update(this.updateSql,
203+
this.sqlParameterSourceFactory.createParameterSource(message), keyHolder);
188204
return keyHolder.getKeyList();
189205
}
190206
}
191207
else {
192-
int updated;
193-
if (this.preparedStatementSetter != null) {
194-
updated = this.jdbcOperations.getJdbcOperations().update(this.updateSql,
195-
ps -> JdbcMessageHandler.this.preparedStatementSetter.setValues(ps, message));
208+
if (message.getPayload() instanceof Iterable) {
209+
Stream<? extends Message<?>> messageStream =
210+
StreamSupport.stream(((Iterable<?>) message.getPayload()).spliterator(), false)
211+
.map(payload -> new Message<Object>() {
212+
213+
@Override
214+
public Object getPayload() {
215+
return payload;
216+
}
217+
218+
@Override
219+
public MessageHeaders getHeaders() {
220+
return message.getHeaders();
221+
}
222+
223+
});
224+
225+
int[] updates;
226+
227+
if (this.preparedStatementSetter != null) {
228+
Message<?>[] messages = messageStream.toArray(Message<?>[]::new);
229+
230+
updates = this.jdbcOperations.getJdbcOperations()
231+
.batchUpdate(this.updateSql, new BatchPreparedStatementSetter() {
232+
233+
@Override
234+
public void setValues(PreparedStatement ps, int i) throws SQLException {
235+
JdbcMessageHandler.this.preparedStatementSetter.setValues(ps, messages[i]);
236+
}
237+
238+
@Override
239+
public int getBatchSize() {
240+
return messages.length;
241+
}
242+
243+
});
244+
}
245+
else {
246+
SqlParameterSource[] sqlParameterSources =
247+
messageStream.map(this.sqlParameterSourceFactory::createParameterSource)
248+
.toArray(SqlParameterSource[]::new);
249+
250+
updates = this.jdbcOperations.batchUpdate(this.updateSql, sqlParameterSources);
251+
}
252+
253+
return Arrays.stream(updates)
254+
.mapToObj(updated -> {
255+
Map<String, Object> map = new LinkedCaseInsensitiveMap<>();
256+
map.put("UPDATED", updated);
257+
return map;
258+
})
259+
.collect(Collectors.toList());
196260
}
197261
else {
198-
updated = this.jdbcOperations.update(this.updateSql, updateParameterSource);
262+
int updated;
263+
264+
if (this.preparedStatementSetter != null) {
265+
updated = this.jdbcOperations.getJdbcOperations()
266+
.update(this.updateSql, ps -> this.preparedStatementSetter.setValues(ps, message));
267+
}
268+
else {
269+
updated = this.jdbcOperations.update(this.updateSql,
270+
this.sqlParameterSourceFactory.createParameterSource(message));
271+
}
272+
273+
LinkedCaseInsensitiveMap<Object> map = new LinkedCaseInsensitiveMap<>();
274+
map.put("UPDATED", updated);
275+
return Collections.singletonList(map);
199276
}
200-
LinkedCaseInsensitiveMap<Object> map = new LinkedCaseInsensitiveMap<Object>();
201-
map.put("UPDATED", updated);
202-
return Collections.singletonList(map);
203277
}
204278
}
205279

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/JdbcMessageHandlerIntegrationTests.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,6 +19,8 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertTrue;
2121

22+
import java.util.Arrays;
23+
import java.util.List;
2224
import java.util.Map;
2325
import java.util.concurrent.atomic.AtomicBoolean;
2426

@@ -88,6 +90,24 @@ public void testSimpleDynamicInsert() {
8890
assertEquals("Wrong name", "foo", map.get("NAME"));
8991
}
9092

93+
@Test
94+
public void testInsertBatch() {
95+
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,
96+
"insert into foos (id, status, name) values (:payload, 0, :payload)");
97+
handler.afterPropertiesSet();
98+
99+
Message<List<String>> message = new GenericMessage<>(Arrays.asList("foo1", "foo2", "foo3"));
100+
handler.handleMessage(message);
101+
102+
List<Map<String, Object>> foos = jdbcTemplate.queryForList("SELECT * FROM FOOS ORDER BY id");
103+
104+
assertEquals(3, foos.size());
105+
106+
assertEquals("foo1", foos.get(0).get("NAME"));
107+
assertEquals("foo2", foos.get(1).get("NAME"));
108+
assertEquals("foo3", foos.get(2).get("NAME"));
109+
}
110+
91111
@Test
92112
public void testInsertWithMessagePreparedStatementSetter() {
93113
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,
@@ -105,6 +125,28 @@ public void testInsertWithMessagePreparedStatementSetter() {
105125
assertTrue(setterInvoked.get());
106126
}
107127

128+
@Test
129+
public void testInsertBatchWithMessagePreparedStatementSetter() {
130+
JdbcMessageHandler handler =
131+
new JdbcMessageHandler(jdbcTemplate, "insert into foos (id, status, name) values (?, 0, ?)");
132+
handler.setPreparedStatementSetter((ps, requestMessage) -> {
133+
ps.setObject(1, requestMessage.getPayload());
134+
ps.setObject(2, requestMessage.getPayload());
135+
});
136+
handler.afterPropertiesSet();
137+
138+
Message<List<String>> message = new GenericMessage<>(Arrays.asList("foo1", "foo2", "foo3"));
139+
handler.handleMessage(message);
140+
141+
List<Map<String, Object>> foos = jdbcTemplate.queryForList("SELECT * FROM FOOS ORDER BY id");
142+
143+
assertEquals(3, foos.size());
144+
145+
assertEquals("foo1", foos.get(0).get("NAME"));
146+
assertEquals("foo2", foos.get(1).get("NAME"));
147+
assertEquals("foo3", foos.get(2).get("NAME"));
148+
}
149+
108150
@Test
109151
public void testIdHeaderDynamicInsert() {
110152
JdbcMessageHandler handler = new JdbcMessageHandler(jdbcTemplate,

src/reference/asciidoc/jdbc.adoc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,15 @@ public MessageHandler jdbcMessageHandler(DataSource dataSource) {
261261
====
262262

263263
From the XML configuration perspective, the `prepared-statement-setter` attribute is available on the `<int-jdbc:outbound-channel-adapter>` component.
264-
It lets you specify a `MessagePreparedStatementSetter` bean reference.
264+
It lets you specify a `MessagePreparedStatementSetter` bean reference.
265+
266+
==== Batch Update
267+
268+
Starting with version 5.1, the `JdbcMessageHandler` performs a `JdbcOperations.batchUpdate()` if the payload of the request message is an `Iterable` instance.
269+
Each element of the `Iterable` is wrapped to a `Message` with the headers from the request message.
270+
In the case of regular `SqlParameterSourceFactory`-based configuration these messages are used to build an `SqlParameterSource[]` for an argument used in the mentioned `JdbcOperations.batchUpdate()` function.
271+
When a `MessagePreparedStatementSetter` configuration is applied, a `BatchPreparedStatementSetter` variant is used to iterate over those messages for each item and the provided `MessagePreparedStatementSetter` is called against them.
272+
The batch update is not supported when `keysGenerated` mode is selected.
265273

266274
[[jdbc-outbound-gateway]]
267275
=== Outbound Gateway

src/reference/asciidoc/whats-new.adoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ See <<amqp-content-type>> for more information.
109109

110110
A confusing `max-rows-per-poll` property on the JDBC Inbound Channel Adapter and JDBC Outbound Gateway has been deprecated in favor of the newly introduced `max-rows` property.
111111

112+
The `JdbcMessageHandler` supports now a `batchUpdate` functionality when the payload of the request message is an instance of an `Iterable` type.
113+
112114
See <<jdbc>> for more information.
113115

114116
[[x5.1-ftp-sftp]]

0 commit comments

Comments
 (0)