Skip to content

Commit 014bb71

Browse files
ctailor2schauder
authored andcommitted
Batch inserts of referenced entities.
Insert for entities of same type within an aggregate get inserted using JDBC batch operations when possible. Inserts are supported when no id needs to be generated by the database or if the Dialect supports generation of ids in batch operations. Closes #1159 Original pull request # 1191
1 parent aafbce3 commit 014bb71

File tree

50 files changed

+2757
-796
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2757
-796
lines changed

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/AggregateChangeExecutor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828
*
2929
* @author Jens Schauder
3030
* @author Myeonghyeon Lee
31+
* @author Chirag Tailor
3132
* @since 2.0
3233
*/
3334
class AggregateChangeExecutor {
@@ -66,6 +67,8 @@ private void execute(DbAction<?> action, JdbcAggregateChangeExecutionContext exe
6667
executionContext.executeInsertRoot((DbAction.InsertRoot<?>) action);
6768
} else if (action instanceof DbAction.Insert) {
6869
executionContext.executeInsert((DbAction.Insert<?>) action);
70+
} else if (action instanceof DbAction.InsertBatch) {
71+
executionContext.executeInsertBatch((DbAction.InsertBatch<?>) action);
6972
} else if (action instanceof DbAction.UpdateRoot) {
7073
executionContext.executeUpdateRoot((DbAction.UpdateRoot<?>) action);
7174
} else if (action instanceof DbAction.Update) {

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/JdbcAggregateChangeExecutionContext.java

+21-16
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,13 +25,15 @@
2525
import java.util.Map;
2626
import java.util.Set;
2727
import java.util.function.BiConsumer;
28+
import java.util.stream.Collectors;
2829

2930
import org.springframework.dao.IncorrectUpdateSemanticsDataAccessException;
3031
import org.springframework.dao.OptimisticLockingFailureException;
3132
import org.springframework.data.jdbc.core.convert.DataAccessStrategy;
3233
import org.springframework.data.jdbc.core.convert.Identifier;
3334
import org.springframework.data.jdbc.core.convert.JdbcConverter;
3435
import org.springframework.data.jdbc.core.convert.JdbcIdentifierBuilder;
36+
import org.springframework.data.jdbc.core.convert.InsertSubject;
3537
import org.springframework.data.mapping.PersistentProperty;
3638
import org.springframework.data.mapping.PersistentPropertyAccessor;
3739
import org.springframework.data.mapping.PersistentPropertyPath;
@@ -51,6 +53,7 @@
5153
* @author Jens Schauder
5254
* @author Umut Erturk
5355
* @author Myeonghyeon Lee
56+
* @author Chirag Tailor
5457
*/
5558
class JdbcAggregateChangeExecutionContext {
5659

@@ -87,11 +90,11 @@ <T> void executeInsertRoot(DbAction.InsertRoot<T> insert) {
8790
T rootEntity = RelationalEntityVersionUtils.setVersionNumberOnEntity( //
8891
insert.getEntity(), initialVersion, persistentEntity, converter);
8992

90-
id = accessStrategy.insert(rootEntity, insert.getEntityType(), Identifier.empty());
93+
id = accessStrategy.insert(rootEntity, insert.getEntityType(), Identifier.empty(), insert.getIdValueSource());
9194

9295
setNewVersion(initialVersion);
9396
} else {
94-
id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), Identifier.empty());
97+
id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), Identifier.empty(), insert.getIdValueSource());
9598
}
9699

97100
add(new DbActionExecutionResult(insert, id));
@@ -100,10 +103,24 @@ <T> void executeInsertRoot(DbAction.InsertRoot<T> insert) {
100103
<T> void executeInsert(DbAction.Insert<T> insert) {
101104

102105
Identifier parentKeys = getParentKeys(insert, converter);
103-
Object id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), parentKeys);
106+
Object id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), parentKeys, insert.getIdValueSource());
104107
add(new DbActionExecutionResult(insert, id));
105108
}
106109

110+
<T> void executeInsertBatch(DbAction.InsertBatch<T> insertBatch) {
111+
112+
List<DbAction.Insert<T>> inserts = insertBatch.getInserts();
113+
List<InsertSubject<T>> insertSubjects = inserts.stream()
114+
.map(insert -> InsertSubject.describedBy(insert.getEntity(), getParentKeys(insert, converter)))
115+
.collect(Collectors.toList());
116+
117+
Object[] ids = accessStrategy.insert(insertSubjects, insertBatch.getEntityType(), insertBatch.getIdValueSource());
118+
119+
for (int i = 0; i < inserts.size(); i++) {
120+
add(new DbActionExecutionResult(inserts.get(i), ids.length > 0 ? ids[i] : null));
121+
}
122+
}
123+
107124
<T> void executeUpdateRoot(DbAction.UpdateRoot<T> update) {
108125

109126
RelationalPersistentEntity<T> persistentEntity = getRequiredPersistentEntity(update.getEntityType());
@@ -155,18 +172,6 @@ <T> void executeDeleteAll(DbAction.DeleteAll<T> delete) {
155172
accessStrategy.deleteAll(delete.getPropertyPath());
156173
}
157174

158-
<T> void executeMerge(DbAction.Merge<T> merge) {
159-
160-
// temporary implementation
161-
if (!accessStrategy.update(merge.getEntity(), merge.getEntityType())) {
162-
163-
Object id = accessStrategy.insert(merge.getEntity(), merge.getEntityType(), getParentKeys(merge, converter));
164-
add(new DbActionExecutionResult(merge, id));
165-
} else {
166-
add(new DbActionExecutionResult());
167-
}
168-
}
169-
170175
<T> void executeAcquireLock(DbAction.AcquireLockRoot<T> acquireLock) {
171176
accessStrategy.acquireLockById(acquireLock.getId(), LockMode.PESSIMISTIC_WRITE, acquireLock.getEntityType());
172177
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.core.convert;
17+
18+
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
19+
20+
/**
21+
* Strategy for executing a batch insert.
22+
*
23+
* @author Chirag Tailor
24+
* @since 2.4
25+
*/
26+
interface BatchInsertStrategy {
27+
28+
/**
29+
* @param sql the insert sql. Must not be {@code null}.
30+
* @param sqlParameterSources the sql parameters for each record to be inserted. Must not be {@code null}.
31+
* @return the ids corresponding to each record that was inserted, if ids were generated. If ids were not generated,
32+
* elements will be {@code null}.
33+
* @since 2.4
34+
*/
35+
Object[] execute(String sql, SqlParameterSource[] sqlParameterSources);
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.jdbc.core.convert;
17+
18+
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
19+
import org.springframework.jdbc.core.ColumnMapRowMapper;
20+
import org.springframework.jdbc.core.JdbcOperations;
21+
import org.springframework.jdbc.core.PreparedStatementCallback;
22+
import org.springframework.jdbc.core.PreparedStatementCreator;
23+
import org.springframework.jdbc.core.PreparedStatementCreatorFactory;
24+
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
25+
import org.springframework.jdbc.core.SqlParameter;
26+
import org.springframework.jdbc.core.namedparam.NamedParameterUtils;
27+
import org.springframework.jdbc.core.namedparam.ParsedSql;
28+
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
29+
import org.springframework.jdbc.support.JdbcUtils;
30+
import org.springframework.jdbc.support.KeyHolder;
31+
import org.springframework.lang.Nullable;
32+
import org.springframework.util.Assert;
33+
34+
import java.sql.PreparedStatement;
35+
import java.sql.ResultSet;
36+
import java.sql.SQLException;
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
/**
42+
* Counterpart to {@link org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations} containing
43+
* methods for performing batch updates with generated keys.
44+
*
45+
* @author Chirag Tailor
46+
* @since 2.4
47+
*/
48+
public class BatchJdbcOperations {
49+
private final JdbcOperations jdbcOperations;
50+
51+
public BatchJdbcOperations(JdbcOperations jdbcOperations) {
52+
this.jdbcOperations = jdbcOperations;
53+
}
54+
55+
/**
56+
* Execute a batch using the supplied SQL statement with the batch of supplied arguments,
57+
* returning generated keys.
58+
* @param sql the SQL statement to execute
59+
* @param batchArgs the array of {@link SqlParameterSource} containing the batch of
60+
* arguments for the query
61+
* @param generatedKeyHolder a {@link KeyHolder} that will hold the generated keys
62+
* @return an array containing the numbers of rows affected by each update in the batch
63+
* (may also contain special JDBC-defined negative values for affected rows such as
64+
* {@link java.sql.Statement#SUCCESS_NO_INFO}/{@link java.sql.Statement#EXECUTE_FAILED})
65+
* @throws org.springframework.dao.DataAccessException if there is any problem issuing the update
66+
* @see org.springframework.jdbc.support.GeneratedKeyHolder
67+
* @since 2.4
68+
*/
69+
int[] batchUpdate(String sql, SqlParameterSource[] batchArgs, KeyHolder generatedKeyHolder) {
70+
return batchUpdate(sql, batchArgs, generatedKeyHolder, null);
71+
}
72+
73+
/**
74+
* Execute a batch using the supplied SQL statement with the batch of supplied arguments,
75+
* returning generated keys.
76+
* @param sql the SQL statement to execute
77+
* @param batchArgs the array of {@link SqlParameterSource} containing the batch of
78+
* arguments for the query
79+
* @param generatedKeyHolder a {@link KeyHolder} that will hold the generated keys
80+
* @param keyColumnNames names of the columns that will have keys generated for them
81+
* @return an array containing the numbers of rows affected by each update in the batch
82+
* (may also contain special JDBC-defined negative values for affected rows such as
83+
* {@link java.sql.Statement#SUCCESS_NO_INFO}/{@link java.sql.Statement#EXECUTE_FAILED})
84+
* @throws org.springframework.dao.DataAccessException if there is any problem issuing the update
85+
* @see org.springframework.jdbc.support.GeneratedKeyHolder
86+
* @since 2.4
87+
*/
88+
int[] batchUpdate(String sql, SqlParameterSource[] batchArgs, KeyHolder generatedKeyHolder,
89+
@Nullable String[] keyColumnNames) {
90+
91+
if (batchArgs.length == 0) {
92+
return new int[0];
93+
}
94+
95+
ParsedSql parsedSql = NamedParameterUtils.parseSqlStatement(sql);
96+
SqlParameterSource paramSource = batchArgs[0];
97+
String sqlToUse = NamedParameterUtils.substituteNamedParameters(parsedSql, paramSource);
98+
List<SqlParameter> declaredParameters = NamedParameterUtils.buildSqlParameterList(parsedSql, paramSource);
99+
PreparedStatementCreatorFactory pscf = new PreparedStatementCreatorFactory(sqlToUse, declaredParameters);
100+
if (keyColumnNames != null) {
101+
pscf.setGeneratedKeysColumnNames(keyColumnNames);
102+
} else {
103+
pscf.setReturnGeneratedKeys(true);
104+
}
105+
Object[] params = NamedParameterUtils.buildValueArray(parsedSql, paramSource, null);
106+
PreparedStatementCreator psc = pscf.newPreparedStatementCreator(params);
107+
BatchPreparedStatementSetter bpss = new BatchPreparedStatementSetter() {
108+
@Override
109+
public void setValues(PreparedStatement ps, int i) throws SQLException {
110+
Object[] values = NamedParameterUtils.buildValueArray(parsedSql, batchArgs[i], null);
111+
pscf.newPreparedStatementSetter(values).setValues(ps);
112+
}
113+
114+
@Override
115+
public int getBatchSize() {
116+
return batchArgs.length;
117+
}
118+
};
119+
PreparedStatementCallback<int[]> preparedStatementCallback = ps -> {
120+
int batchSize = bpss.getBatchSize();
121+
generatedKeyHolder.getKeyList().clear();
122+
if (JdbcUtils.supportsBatchUpdates(ps.getConnection())) {
123+
for (int i = 0; i < batchSize; i++) {
124+
bpss.setValues(ps, i);
125+
ps.addBatch();
126+
}
127+
int[] results = ps.executeBatch();
128+
storeGeneratedKeys(generatedKeyHolder, ps, batchSize);
129+
return results;
130+
} else {
131+
List<Integer> rowsAffected = new ArrayList<>();
132+
for (int i = 0; i < batchSize; i++) {
133+
bpss.setValues(ps, i);
134+
rowsAffected.add(ps.executeUpdate());
135+
storeGeneratedKeys(generatedKeyHolder, ps, 1);
136+
}
137+
int[] rowsAffectedArray = new int[rowsAffected.size()];
138+
for (int i = 0; i < rowsAffectedArray.length; i++) {
139+
rowsAffectedArray[i] = rowsAffected.get(i);
140+
}
141+
return rowsAffectedArray;
142+
}
143+
};
144+
int[] result = jdbcOperations.execute(psc, preparedStatementCallback);
145+
Assert.state(result != null, "No result array");
146+
return result;
147+
}
148+
149+
private void storeGeneratedKeys(KeyHolder generatedKeyHolder, PreparedStatement ps, int rowsExpected) throws SQLException {
150+
151+
List<Map<String, Object>> generatedKeys = generatedKeyHolder.getKeyList();
152+
ResultSet keys = ps.getGeneratedKeys();
153+
if (keys != null) {
154+
try {
155+
RowMapperResultSetExtractor<Map<String, Object>> rse =
156+
new RowMapperResultSetExtractor<>(new ColumnMapRowMapper(), rowsExpected);
157+
//noinspection ConstantConditions
158+
generatedKeys.addAll(rse.extractData(keys));
159+
}
160+
finally {
161+
JdbcUtils.closeResultSet(keys);
162+
}
163+
}
164+
}
165+
}

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/convert/CascadingDataAccessStrategy.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -23,6 +23,7 @@
2323
import org.springframework.data.domain.Pageable;
2424
import org.springframework.data.domain.Sort;
2525
import org.springframework.data.mapping.PersistentPropertyPath;
26+
import org.springframework.data.relational.core.conversion.IdValueSource;
2627
import org.springframework.data.relational.core.mapping.RelationalPersistentProperty;
2728
import org.springframework.data.relational.core.sql.LockMode;
2829

@@ -35,6 +36,7 @@
3536
* @author Tyler Van Gorder
3637
* @author Milan Milanov
3738
* @author Myeonghyeon Lee
39+
* @author Chirag Tailor
3840
* @since 1.1
3941
*/
4042
public class CascadingDataAccessStrategy implements DataAccessStrategy {
@@ -45,13 +47,23 @@ public CascadingDataAccessStrategy(List<DataAccessStrategy> strategies) {
4547
this.strategies = new ArrayList<>(strategies);
4648
}
4749

50+
@Override
51+
public <T> Object insert(T instance, Class<T> domainType, Identifier identifier) {
52+
return collect(das -> das.insert(instance, domainType, identifier));
53+
}
54+
4855
/*
4956
* (non-Javadoc)
5057
* @see org.springframework.data.jdbc.core.DataAccessStrategy#insert(java.lang.Object, java.lang.Class, org.springframework.data.jdbc.core.ParentKeys)
5158
*/
5259
@Override
53-
public <T> Object insert(T instance, Class<T> domainType, Identifier identifier) {
54-
return collect(das -> das.insert(instance, domainType, identifier));
60+
public <T> Object insert(T instance, Class<T> domainType, Identifier identifier, IdValueSource idValueSource) {
61+
return collect(das -> das.insert(instance, domainType, identifier, idValueSource));
62+
}
63+
64+
@Override
65+
public <T> Object[] insert(List<InsertSubject<T>> insertSubjects, Class<T> domainType, IdValueSource idValueSource) {
66+
return collect(das -> das.insert(insertSubjects, domainType, idValueSource));
5567
}
5668

5769
/*

0 commit comments

Comments
 (0)