Skip to content

Commit 6b02a4e

Browse files
ctailor2schauder
authored andcommitted
Add SaveMergedAggregateChange which merges AggregateChangeWithRoot changes into one.
Remove behavior from WritingContext for creating InsertBatch in favor of SaveMergedAggregateChange. Update all save paths to use SaveMergedAggregateChange. + Update #populateIdsIfNecessary return type from T to List<T> Pull out an abstract BatchWithValue class from InsertBatch to use it for batching root inserts as well. Rename InsertBatch to BatchInsert Rename AggregateChangeWithRoot to RootAggregateChange. Original pull request #1211
1 parent d3d0503 commit 6b02a4e

File tree

36 files changed

+1208
-393
lines changed

36 files changed

+1208
-393
lines changed

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/AggregateChangeExecutor.java

+10-9
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818
import org.springframework.data.jdbc.core.convert.DataAccessStrategy;
1919
import org.springframework.data.jdbc.core.convert.JdbcConverter;
2020
import org.springframework.data.relational.core.conversion.AggregateChange;
21-
import org.springframework.data.relational.core.conversion.AggregateChangeWithRoot;
2221
import org.springframework.data.relational.core.conversion.DbAction;
2322
import org.springframework.data.relational.core.conversion.DbActionExecutionException;
2423
import org.springframework.data.relational.core.conversion.MutableAggregateChange;
2524

25+
import java.util.List;
26+
2627
/**
2728
* Executes an {@link MutableAggregateChange}.
2829
*
@@ -43,15 +44,15 @@ class AggregateChangeExecutor {
4344
}
4445

4546
/**
46-
* Execute an aggregate change which has a root entity. It returns the root entity, with all changes that might apply.
47-
* This might be the original instance or a new instance, depending on its mutability.
47+
* Execute a save aggregate change. It returns the resulting root entities, with all changes that might apply. This
48+
* might be the original instances or new instances, depending on their mutability.
4849
*
4950
* @param aggregateChange the aggregate change to be executed. Must not be {@literal null}.
5051
* @param <T> the type of the aggregate root.
51-
* @return the potentially modified aggregate root. Guaranteed to be not {@literal null}.
52+
* @return the aggregate roots resulting from the change, if there are any. May be empty.
5253
* @since 3.0
5354
*/
54-
<T> T execute(AggregateChangeWithRoot<T> aggregateChange) {
55+
<T> List<T> executeSave(AggregateChange<T> aggregateChange) {
5556

5657
JdbcAggregateChangeExecutionContext executionContext = new JdbcAggregateChangeExecutionContext(converter,
5758
accessStrategy);
@@ -62,13 +63,13 @@ <T> T execute(AggregateChangeWithRoot<T> aggregateChange) {
6263
}
6364

6465
/**
65-
* Execute an aggregate change without a root entity.
66+
* Execute a delete aggregate change.
6667
*
6768
* @param aggregateChange the aggregate change to be executed. Must not be {@literal null}.
6869
* @param <T> the type of the aggregate root.
6970
* @since 3.0
7071
*/
71-
<T> void execute(AggregateChange<T> aggregateChange) {
72+
<T> void executeDelete(AggregateChange<T> aggregateChange) {
7273

7374
JdbcAggregateChangeExecutionContext executionContext = new JdbcAggregateChangeExecutionContext(converter,
7475
accessStrategy);
@@ -83,8 +84,8 @@ private void execute(DbAction<?> action, JdbcAggregateChangeExecutionContext exe
8384
executionContext.executeInsertRoot((DbAction.InsertRoot<?>) action);
8485
} else if (action instanceof DbAction.Insert) {
8586
executionContext.executeInsert((DbAction.Insert<?>) action);
86-
} else if (action instanceof DbAction.InsertBatch) {
87-
executionContext.executeInsertBatch((DbAction.InsertBatch<?>) action);
87+
} else if (action instanceof DbAction.BatchInsert) {
88+
executionContext.executeBatchInsert((DbAction.BatchInsert<?>) action);
8889
} else if (action instanceof DbAction.UpdateRoot) {
8990
executionContext.executeUpdateRoot((DbAction.UpdateRoot<?>) action);
9091
} else if (action instanceof DbAction.Delete) {

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/JdbcAggregateChangeExecutionContext.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,14 @@ <T> void executeInsert(DbAction.Insert<T> insert) {
8383
add(new DbActionExecutionResult(insert, id));
8484
}
8585

86-
<T> void executeInsertBatch(DbAction.InsertBatch<T> insertBatch) {
86+
<T> void executeBatchInsert(DbAction.BatchInsert<T> batchInsert) {
8787

88-
List<DbAction.Insert<T>> inserts = insertBatch.getInserts();
88+
List<DbAction.Insert<T>> inserts = batchInsert.getActions();
8989
List<InsertSubject<T>> insertSubjects = inserts.stream()
9090
.map(insert -> InsertSubject.describedBy(insert.getEntity(), getParentKeys(insert, converter)))
9191
.collect(Collectors.toList());
9292

93-
Object[] ids = accessStrategy.insert(insertSubjects, insertBatch.getEntityType(), insertBatch.getIdValueSource());
93+
Object[] ids = accessStrategy.insert(insertSubjects, batchInsert.getEntityType(), batchInsert.getBatchValue());
9494

9595
for (int i = 0; i < inserts.size(); i++) {
9696
add(new DbActionExecutionResult(inserts.get(i), ids.length > 0 ? ids[i] : null));
@@ -216,14 +216,16 @@ private Object getIdFrom(DbAction.WithEntity<?> idOwningAction) {
216216
return identifier;
217217
}
218218

219-
<T> T populateIdsIfNecessary() {
219+
<T> List<T> populateIdsIfNecessary() {
220220

221221
// have the results so that the inserts on the leaves come first.
222222
List<DbActionExecutionResult> reverseResults = new ArrayList<>(results.values());
223223
Collections.reverse(reverseResults);
224224

225225
StagedValues cascadingValues = new StagedValues();
226226

227+
List<T> roots = new ArrayList<>(reverseResults.size());
228+
227229
for (DbActionExecutionResult result : reverseResults) {
228230

229231
DbAction.WithEntity<?> action = result.getAction();
@@ -232,7 +234,7 @@ <T> T populateIdsIfNecessary() {
232234

233235
if (action instanceof DbAction.InsertRoot || action instanceof DbAction.UpdateRoot) {
234236
// noinspection unchecked
235-
return (T) newEntity;
237+
roots.add((T) newEntity);
236238
}
237239

238240
// the id property was immutable so we have to propagate changes up the tree
@@ -246,9 +248,15 @@ <T> T populateIdsIfNecessary() {
246248
}
247249
}
248250

249-
throw new IllegalStateException(
250-
String.format("Cannot retrieve the resulting instance unless a %s or %s action was successfully executed.",
251-
DbAction.InsertRoot.class.getName(), DbAction.UpdateRoot.class.getName()));
251+
if (roots.isEmpty()) {
252+
throw new IllegalStateException(
253+
String.format("Cannot retrieve the resulting instance(s) unless a %s or %s action was successfully executed.",
254+
DbAction.InsertRoot.class.getName(), DbAction.UpdateRoot.class.getName()));
255+
}
256+
257+
Collections.reverse(roots);
258+
259+
return roots;
252260
}
253261

254262
private <S> Object setIdAndCascadingProperties(DbAction.WithEntity<S> action, @Nullable Object generatedId,

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/JdbcAggregateOperations.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
* @author Jens Schauder
2727
* @author Thomas Lang
2828
* @author Milan Milanov
29+
* @author Chirag Tailor
2930
*/
3031
public interface JdbcAggregateOperations {
3132

@@ -38,6 +39,16 @@ public interface JdbcAggregateOperations {
3839
*/
3940
<T> T save(T instance);
4041

42+
/**
43+
* Saves all aggregate instances, including all the members of each aggregate instance.
44+
*
45+
* @param instances the aggregate roots to be saved. Must not be {@code null}.
46+
* @param <T> the type of the aggregate root.
47+
* @return the saved instances.
48+
* @since 3.0
49+
*/
50+
<T> Iterable<T> saveAll(Iterable<T> instances);
51+
4152
/**
4253
* Dedicated insert function. This skips the test if the aggregate root is new and makes an insert.
4354
* <p>

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/core/JdbcAggregateTemplate.java

+81-29
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.springframework.data.jdbc.core;
1717

1818
import java.util.ArrayList;
19+
import java.util.Iterator;
1920
import java.util.List;
2021
import java.util.function.Function;
2122
import java.util.stream.Collectors;
@@ -31,7 +32,8 @@
3132
import org.springframework.data.mapping.IdentifierAccessor;
3233
import org.springframework.data.mapping.callback.EntityCallbacks;
3334
import org.springframework.data.relational.core.conversion.AggregateChange;
34-
import org.springframework.data.relational.core.conversion.AggregateChangeWithRoot;
35+
import org.springframework.data.relational.core.conversion.RootAggregateChange;
36+
import org.springframework.data.relational.core.conversion.BatchingAggregateChange;
3537
import org.springframework.data.relational.core.conversion.MutableAggregateChange;
3638
import org.springframework.data.relational.core.conversion.RelationalEntityDeleteWriter;
3739
import org.springframework.data.relational.core.conversion.RelationalEntityInsertWriter;
@@ -44,6 +46,7 @@
4446
import org.springframework.data.support.PageableExecutionUtils;
4547
import org.springframework.lang.Nullable;
4648
import org.springframework.util.Assert;
49+
import org.springframework.util.ClassUtils;
4750

4851
/**
4952
* {@link JdbcAggregateOperations} implementation, storing aggregates in and obtaining them from a JDBC data store.
@@ -141,13 +144,15 @@ public <T> T save(T instance) {
141144

142145
Assert.notNull(instance, "Aggregate instance must not be null!");
143146

144-
RelationalPersistentEntity<?> persistentEntity = context.getRequiredPersistentEntity(instance.getClass());
147+
return performSave(instance, changeCreatorSelectorForSave(instance));
148+
}
145149

146-
Function<T, AggregateChangeWithRoot<T>> changeCreator = persistentEntity.isNew(instance)
147-
? entity -> createInsertChange(prepareVersionForInsert(entity))
148-
: entity -> createUpdateChange(prepareVersionForUpdate(entity));
150+
@Override
151+
public <T> Iterable<T> saveAll(Iterable<T> instances) {
152+
153+
Assert.isTrue(instances.iterator().hasNext(), "Aggregate instances must not be empty!");
149154

150-
return store(instance, changeCreator, persistentEntity);
155+
return performSaveAll(instances);
151156
}
152157

153158
/**
@@ -162,9 +167,7 @@ public <T> T insert(T instance) {
162167

163168
Assert.notNull(instance, "Aggregate instance must not be null!");
164169

165-
RelationalPersistentEntity<?> persistentEntity = context.getRequiredPersistentEntity(instance.getClass());
166-
167-
return store(instance, entity -> createInsertChange(prepareVersionForInsert(entity)), persistentEntity);
170+
return performSave(instance, entity -> createInsertChange(prepareVersionForInsert(entity)));
168171
}
169172

170173
/**
@@ -179,9 +182,7 @@ public <T> T update(T instance) {
179182

180183
Assert.notNull(instance, "Aggregate instance must not be null!");
181184

182-
RelationalPersistentEntity<?> persistentEntity = context.getRequiredPersistentEntity(instance.getClass());
183-
184-
return store(instance, entity -> createUpdateChange(prepareVersionForUpdate(entity)), persistentEntity);
185+
return performSave(instance, entity -> createUpdateChange(prepareVersionForUpdate(entity)));
185186
}
186187

187188
@Override
@@ -280,29 +281,33 @@ public void deleteAll(Class<?> domainType) {
280281
Assert.notNull(domainType, "Domain type must not be null!");
281282

282283
MutableAggregateChange<?> change = createDeletingChange(domainType);
283-
executor.execute(change);
284+
executor.executeDelete(change);
285+
}
286+
287+
private <T> T afterExecute(AggregateChange<T> change, T entityAfterExecution) {
288+
289+
Object identifier = context.getRequiredPersistentEntity(change.getEntityType())
290+
.getIdentifierAccessor(entityAfterExecution).getIdentifier();
291+
292+
Assert.notNull(identifier, "After saving the identifier must not be null!");
293+
294+
return triggerAfterSave(entityAfterExecution, change);
284295
}
285296

286-
private <T> T store(T aggregateRoot, Function<T, AggregateChangeWithRoot<T>> changeCreator,
287-
RelationalPersistentEntity<?> persistentEntity) {
297+
private <T> RootAggregateChange<T> beforeExecute(T aggregateRoot,
298+
Function<T, RootAggregateChange<T>> changeCreator) {
288299

289300
Assert.notNull(aggregateRoot, "Aggregate instance must not be null!");
290301

291302
aggregateRoot = triggerBeforeConvert(aggregateRoot);
292303

293-
AggregateChangeWithRoot<T> change = changeCreator.apply(aggregateRoot);
304+
RootAggregateChange<T> change = changeCreator.apply(aggregateRoot);
294305

295306
aggregateRoot = triggerBeforeSave(change.getRoot(), change);
296307

297308
change.setRoot(aggregateRoot);
298309

299-
T entityAfterExecution = executor.execute(change);
300-
301-
Object identifier = persistentEntity.getIdentifierAccessor(entityAfterExecution).getIdentifier();
302-
303-
Assert.notNull(identifier, "After saving the identifier must not be null!");
304-
305-
return triggerAfterSave(entityAfterExecution, change);
310+
return change;
306311
}
307312

308313
private <T> void deleteTree(Object id, @Nullable T entity, Class<T> domainType) {
@@ -311,23 +316,70 @@ private <T> void deleteTree(Object id, @Nullable T entity, Class<T> domainType)
311316

312317
entity = triggerBeforeDelete(entity, id, change);
313318

314-
executor.execute(change);
319+
executor.executeDelete(change);
315320

316321
triggerAfterDelete(entity, id, change);
317322
}
318323

319-
private <T> AggregateChangeWithRoot<T> createInsertChange(T instance) {
324+
private <T> T performSave(T instance, Function<T, RootAggregateChange<T>> changeCreator) {
325+
326+
// noinspection unchecked
327+
BatchingAggregateChange<T, RootAggregateChange<T>> batchingAggregateChange = //
328+
BatchingAggregateChange.forSave((Class<T>) ClassUtils.getUserClass(instance));
329+
batchingAggregateChange.add(beforeExecute(instance, changeCreator));
330+
331+
Iterator<T> afterExecutionIterator = executor.executeSave(batchingAggregateChange).iterator();
332+
333+
Assert.isTrue(afterExecutionIterator.hasNext(), "Instances after execution must not be empty!");
334+
335+
return afterExecute(batchingAggregateChange, afterExecutionIterator.next());
336+
}
337+
338+
private <T> List<T> performSaveAll(Iterable<T> instances) {
339+
340+
Iterator<T> iterator = instances.iterator();
341+
T firstInstance = iterator.next();
342+
343+
// noinspection unchecked
344+
BatchingAggregateChange<T, RootAggregateChange<T>> batchingAggregateChange = //
345+
BatchingAggregateChange.forSave((Class<T>) ClassUtils.getUserClass(firstInstance));
346+
batchingAggregateChange.add(beforeExecute(firstInstance, changeCreatorSelectorForSave(firstInstance)));
347+
348+
while (iterator.hasNext()) {
349+
T instance = iterator.next();
350+
batchingAggregateChange.add(beforeExecute(instance, changeCreatorSelectorForSave(instance)));
351+
}
352+
353+
List<T> instancesAfterExecution = executor.executeSave(batchingAggregateChange);
354+
355+
ArrayList<T> results = new ArrayList<>(instancesAfterExecution.size());
356+
for (T instance : instancesAfterExecution) {
357+
results.add(afterExecute(batchingAggregateChange, instance));
358+
}
359+
360+
return results;
361+
}
362+
363+
private <T> Function<T, RootAggregateChange<T>> changeCreatorSelectorForSave(T instance) {
364+
365+
return context.getRequiredPersistentEntity(instance.getClass()).isNew(instance)
366+
? entity -> createInsertChange(prepareVersionForInsert(entity))
367+
: entity -> createUpdateChange(prepareVersionForUpdate(entity));
368+
}
369+
370+
private <T> RootAggregateChange<T> createInsertChange(T instance) {
320371

321-
AggregateChangeWithRoot<T> aggregateChange = MutableAggregateChange.forSave(instance);
372+
RootAggregateChange<T> aggregateChange = MutableAggregateChange.forSave(instance);
322373
new RelationalEntityInsertWriter<T>(context).write(instance, aggregateChange);
323374
return aggregateChange;
324375
}
325376

326-
private <T> AggregateChangeWithRoot<T> createUpdateChange(EntityAndPreviousVersion<T> entityAndVersion) {
377+
private <T> RootAggregateChange<T> createUpdateChange(EntityAndPreviousVersion<T> entityAndVersion) {
327378

328-
AggregateChangeWithRoot<T> aggregateChange = MutableAggregateChange.forSave(entityAndVersion.entity,
379+
RootAggregateChange<T> aggregateChange = MutableAggregateChange.forSave(entityAndVersion.entity,
329380
entityAndVersion.version);
330-
new RelationalEntityUpdateWriter<T>(context).write(entityAndVersion.entity, aggregateChange);
381+
new RelationalEntityUpdateWriter<T>(context).write(entityAndVersion.entity,
382+
aggregateChange);
331383
return aggregateChange;
332384
}
333385

spring-data-jdbc/src/main/java/org/springframework/data/jdbc/repository/support/SimpleJdbcRepository.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2021 the original author or authors.
2+
* Copyright 2017-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,7 +16,6 @@
1616
package org.springframework.data.jdbc.repository.support;
1717

1818
import java.util.Optional;
19-
import java.util.stream.Collectors;
2019

2120
import org.springframework.data.domain.Page;
2221
import org.springframework.data.domain.Pageable;
@@ -25,7 +24,6 @@
2524
import org.springframework.data.mapping.PersistentEntity;
2625
import org.springframework.data.repository.CrudRepository;
2726
import org.springframework.data.repository.PagingAndSortingRepository;
28-
import org.springframework.data.util.Streamable;
2927
import org.springframework.transaction.annotation.Transactional;
3028
import org.springframework.util.Assert;
3129

@@ -35,6 +33,7 @@
3533
* @author Jens Schauder
3634
* @author Oliver Gierke
3735
* @author Milan Milanov
36+
* @author Chirag Tailor
3837
*/
3938
@Transactional(readOnly = true)
4039
public class SimpleJdbcRepository<T, ID> implements CrudRepository<T,ID>, PagingAndSortingRepository<T, ID> {
@@ -60,10 +59,7 @@ public <S extends T> S save(S instance) {
6059
@Transactional
6160
@Override
6261
public <S extends T> Iterable<S> saveAll(Iterable<S> entities) {
63-
64-
return Streamable.of(entities).stream() //
65-
.map(this::save) //
66-
.collect(Collectors.toList());
62+
return entityOperations.saveAll(entities);
6763
}
6864

6965
@Override

0 commit comments

Comments
 (0)