Skip to content

Batch non-root inserts across aggregates #1211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-relational-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.0-537-batch-ops-across-aggregates-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data Relational Parent</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-jdbc-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-relational-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.0-537-batch-ops-across-aggregates-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
4 changes: 2 additions & 2 deletions spring-data-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>

<artifactId>spring-data-jdbc</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.0-537-batch-ops-across-aggregates-SNAPSHOT</version>

<name>Spring Data JDBC</name>
<description>Spring Data module for JDBC repositories.</description>
Expand All @@ -15,7 +15,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-relational-parent</artifactId>
<version>3.0.0-SNAPSHOT</version>
<version>3.0.0-537-batch-ops-across-aggregates-SNAPSHOT</version>
</parent>

<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import org.springframework.data.jdbc.core.convert.DataAccessStrategy;
import org.springframework.data.jdbc.core.convert.JdbcConverter;
import org.springframework.data.relational.core.conversion.AggregateChange;
import org.springframework.data.relational.core.conversion.AggregateChangeWithRoot;
import org.springframework.data.relational.core.conversion.DbAction;
import org.springframework.data.relational.core.conversion.DbActionExecutionException;
import org.springframework.data.relational.core.conversion.MutableAggregateChange;

import java.util.List;

/**
* Executes an {@link MutableAggregateChange}.
*
Expand All @@ -43,15 +44,15 @@ class AggregateChangeExecutor {
}

/**
* Execute an aggregate change which has a root entity. It returns the root entity, with all changes that might apply.
* This might be the original instance or a new instance, depending on its mutability.
* Execute a save aggregate change. It returns the resulting root entities, with all changes that might apply. This
* might be the original instances or new instances, depending on their mutability.
*
* @param aggregateChange the aggregate change to be executed. Must not be {@literal null}.
* @param <T> the type of the aggregate root.
* @return the potentially modified aggregate root. Guaranteed to be not {@literal null}.
* @return the aggregate roots resulting from the change, if there are any. May be empty.
* @since 3.0
*/
<T> T execute(AggregateChangeWithRoot<T> aggregateChange) {
<T> List<T> executeSave(AggregateChange<T> aggregateChange) {

JdbcAggregateChangeExecutionContext executionContext = new JdbcAggregateChangeExecutionContext(converter,
accessStrategy);
Expand All @@ -62,13 +63,13 @@ <T> T execute(AggregateChangeWithRoot<T> aggregateChange) {
}

/**
* Execute an aggregate change without a root entity.
* Execute a delete aggregate change.
*
* @param aggregateChange the aggregate change to be executed. Must not be {@literal null}.
* @param <T> the type of the aggregate root.
* @since 3.0
*/
<T> void execute(AggregateChange<T> aggregateChange) {
<T> void executeDelete(AggregateChange<T> aggregateChange) {

JdbcAggregateChangeExecutionContext executionContext = new JdbcAggregateChangeExecutionContext(converter,
accessStrategy);
Expand All @@ -83,8 +84,8 @@ private void execute(DbAction<?> action, JdbcAggregateChangeExecutionContext exe
executionContext.executeInsertRoot((DbAction.InsertRoot<?>) action);
} else if (action instanceof DbAction.Insert) {
executionContext.executeInsert((DbAction.Insert<?>) action);
} else if (action instanceof DbAction.InsertBatch) {
executionContext.executeInsertBatch((DbAction.InsertBatch<?>) action);
} else if (action instanceof DbAction.BatchInsert) {
executionContext.executeBatchInsert((DbAction.BatchInsert<?>) action);
} else if (action instanceof DbAction.UpdateRoot) {
executionContext.executeUpdateRoot((DbAction.UpdateRoot<?>) action);
} else if (action instanceof DbAction.Delete) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ <T> void executeInsert(DbAction.Insert<T> insert) {
add(new DbActionExecutionResult(insert, id));
}

<T> void executeInsertBatch(DbAction.InsertBatch<T> insertBatch) {
<T> void executeBatchInsert(DbAction.BatchInsert<T> batchInsert) {

List<DbAction.Insert<T>> inserts = insertBatch.getInserts();
List<DbAction.Insert<T>> inserts = batchInsert.getActions();
List<InsertSubject<T>> insertSubjects = inserts.stream()
.map(insert -> InsertSubject.describedBy(insert.getEntity(), getParentKeys(insert, converter)))
.collect(Collectors.toList());

Object[] ids = accessStrategy.insert(insertSubjects, insertBatch.getEntityType(), insertBatch.getIdValueSource());
Object[] ids = accessStrategy.insert(insertSubjects, batchInsert.getEntityType(), batchInsert.getBatchValue());

for (int i = 0; i < inserts.size(); i++) {
add(new DbActionExecutionResult(inserts.get(i), ids.length > 0 ? ids[i] : null));
Expand Down Expand Up @@ -216,14 +216,16 @@ private Object getIdFrom(DbAction.WithEntity<?> idOwningAction) {
return identifier;
}

<T> T populateIdsIfNecessary() {
<T> List<T> populateIdsIfNecessary() {

// have the results so that the inserts on the leaves come first.
List<DbActionExecutionResult> reverseResults = new ArrayList<>(results.values());
Collections.reverse(reverseResults);

StagedValues cascadingValues = new StagedValues();

List<T> roots = new ArrayList<>(reverseResults.size());

for (DbActionExecutionResult result : reverseResults) {

DbAction.WithEntity<?> action = result.getAction();
Expand All @@ -232,7 +234,7 @@ <T> T populateIdsIfNecessary() {

if (action instanceof DbAction.InsertRoot || action instanceof DbAction.UpdateRoot) {
// noinspection unchecked
return (T) newEntity;
roots.add((T) newEntity);
}

// the id property was immutable so we have to propagate changes up the tree
Expand All @@ -246,9 +248,15 @@ <T> T populateIdsIfNecessary() {
}
}

throw new IllegalStateException(
String.format("Cannot retrieve the resulting instance unless a %s or %s action was successfully executed.",
DbAction.InsertRoot.class.getName(), DbAction.UpdateRoot.class.getName()));
if (roots.isEmpty()) {
throw new IllegalStateException(
String.format("Cannot retrieve the resulting instance(s) unless a %s or %s action was successfully executed.",
DbAction.InsertRoot.class.getName(), DbAction.UpdateRoot.class.getName()));
}

Collections.reverse(roots);

return roots;
}

private <S> Object setIdAndCascadingProperties(DbAction.WithEntity<S> action, @Nullable Object generatedId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2021 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
* @author Jens Schauder
* @author Thomas Lang
* @author Milan Milanov
* @author Chirag Tailor
*/
public interface JdbcAggregateOperations {

Expand All @@ -38,6 +39,16 @@ public interface JdbcAggregateOperations {
*/
<T> T save(T instance);

/**
* Saves all aggregate instances, including all the members of each aggregate instance.
*
* @param instances the aggregate roots to be saved. Must not be {@code null}.
* @param <T> the type of the aggregate root.
* @return the saved instances.
* @since 3.0
*/
<T> Iterable<T> saveAll(Iterable<T> instances);

/**
* Dedicated insert function. This skips the test if the aggregate root is new and makes an insert.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.springframework.data.jdbc.core;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -31,7 +32,8 @@
import org.springframework.data.mapping.IdentifierAccessor;
import org.springframework.data.mapping.callback.EntityCallbacks;
import org.springframework.data.relational.core.conversion.AggregateChange;
import org.springframework.data.relational.core.conversion.AggregateChangeWithRoot;
import org.springframework.data.relational.core.conversion.RootAggregateChange;
import org.springframework.data.relational.core.conversion.BatchingAggregateChange;
import org.springframework.data.relational.core.conversion.MutableAggregateChange;
import org.springframework.data.relational.core.conversion.RelationalEntityDeleteWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityInsertWriter;
Expand All @@ -44,6 +46,7 @@
import org.springframework.data.support.PageableExecutionUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
* {@link JdbcAggregateOperations} implementation, storing aggregates in and obtaining them from a JDBC data store.
Expand Down Expand Up @@ -139,13 +142,15 @@ public <T> T save(T instance) {

Assert.notNull(instance, "Aggregate instance must not be null!");

RelationalPersistentEntity<?> persistentEntity = context.getRequiredPersistentEntity(instance.getClass());
return performSave(instance, changeCreatorSelectorForSave(instance));
}

Function<T, AggregateChangeWithRoot<T>> changeCreator = persistentEntity.isNew(instance)
? entity -> createInsertChange(prepareVersionForInsert(entity))
: entity -> createUpdateChange(prepareVersionForUpdate(entity));
@Override
public <T> Iterable<T> saveAll(Iterable<T> instances) {

Assert.isTrue(instances.iterator().hasNext(), "Aggregate instances must not be empty!");

return store(instance, changeCreator, persistentEntity);
return performSaveAll(instances);
}

/**
Expand All @@ -160,9 +165,7 @@ public <T> T insert(T instance) {

Assert.notNull(instance, "Aggregate instance must not be null!");

RelationalPersistentEntity<?> persistentEntity = context.getRequiredPersistentEntity(instance.getClass());

return store(instance, entity -> createInsertChange(prepareVersionForInsert(entity)), persistentEntity);
return performSave(instance, entity -> createInsertChange(prepareVersionForInsert(entity)));
}

/**
Expand All @@ -177,9 +180,7 @@ public <T> T update(T instance) {

Assert.notNull(instance, "Aggregate instance must not be null!");

RelationalPersistentEntity<?> persistentEntity = context.getRequiredPersistentEntity(instance.getClass());

return store(instance, entity -> createUpdateChange(prepareVersionForUpdate(entity)), persistentEntity);
return performSave(instance, entity -> createUpdateChange(prepareVersionForUpdate(entity)));
}

@Override
Expand Down Expand Up @@ -278,29 +279,33 @@ public void deleteAll(Class<?> domainType) {
Assert.notNull(domainType, "Domain type must not be null!");

MutableAggregateChange<?> change = createDeletingChange(domainType);
executor.execute(change);
executor.executeDelete(change);
}

private <T> T afterExecute(AggregateChange<T> change, T entityAfterExecution) {

Object identifier = context.getRequiredPersistentEntity(change.getEntityType())
.getIdentifierAccessor(entityAfterExecution).getIdentifier();

Assert.notNull(identifier, "After saving the identifier must not be null!");

return triggerAfterSave(entityAfterExecution, change);
}

private <T> T store(T aggregateRoot, Function<T, AggregateChangeWithRoot<T>> changeCreator,
RelationalPersistentEntity<?> persistentEntity) {
private <T> RootAggregateChange<T> beforeExecute(T aggregateRoot,
Function<T, RootAggregateChange<T>> changeCreator) {

Assert.notNull(aggregateRoot, "Aggregate instance must not be null!");

aggregateRoot = triggerBeforeConvert(aggregateRoot);

AggregateChangeWithRoot<T> change = changeCreator.apply(aggregateRoot);
RootAggregateChange<T> change = changeCreator.apply(aggregateRoot);

aggregateRoot = triggerBeforeSave(change.getRoot(), change);

change.setRoot(aggregateRoot);

T entityAfterExecution = executor.execute(change);

Object identifier = persistentEntity.getIdentifierAccessor(entityAfterExecution).getIdentifier();

Assert.notNull(identifier, "After saving the identifier must not be null!");

return triggerAfterSave(entityAfterExecution, change);
return change;
}

private <T> void deleteTree(Object id, @Nullable T entity, Class<T> domainType) {
Expand All @@ -309,23 +314,70 @@ private <T> void deleteTree(Object id, @Nullable T entity, Class<T> domainType)

entity = triggerBeforeDelete(entity, id, change);

executor.execute(change);
executor.executeDelete(change);

triggerAfterDelete(entity, id, change);
}

private <T> AggregateChangeWithRoot<T> createInsertChange(T instance) {
private <T> T performSave(T instance, Function<T, RootAggregateChange<T>> changeCreator) {

// noinspection unchecked
BatchingAggregateChange<T, RootAggregateChange<T>> batchingAggregateChange = //
BatchingAggregateChange.forSave((Class<T>) ClassUtils.getUserClass(instance));
batchingAggregateChange.add(beforeExecute(instance, changeCreator));

Iterator<T> afterExecutionIterator = executor.executeSave(batchingAggregateChange).iterator();

Assert.isTrue(afterExecutionIterator.hasNext(), "Instances after execution must not be empty!");

return afterExecute(batchingAggregateChange, afterExecutionIterator.next());
}

private <T> List<T> performSaveAll(Iterable<T> instances) {

Iterator<T> iterator = instances.iterator();
T firstInstance = iterator.next();

// noinspection unchecked
BatchingAggregateChange<T, RootAggregateChange<T>> batchingAggregateChange = //
BatchingAggregateChange.forSave((Class<T>) ClassUtils.getUserClass(firstInstance));
batchingAggregateChange.add(beforeExecute(firstInstance, changeCreatorSelectorForSave(firstInstance)));

while (iterator.hasNext()) {
T instance = iterator.next();
batchingAggregateChange.add(beforeExecute(instance, changeCreatorSelectorForSave(instance)));
}

List<T> instancesAfterExecution = executor.executeSave(batchingAggregateChange);

ArrayList<T> results = new ArrayList<>(instancesAfterExecution.size());
for (T instance : instancesAfterExecution) {
results.add(afterExecute(batchingAggregateChange, instance));
}

return results;
}

private <T> Function<T, RootAggregateChange<T>> changeCreatorSelectorForSave(T instance) {

return context.getRequiredPersistentEntity(instance.getClass()).isNew(instance)
? entity -> createInsertChange(prepareVersionForInsert(entity))
: entity -> createUpdateChange(prepareVersionForUpdate(entity));
}

private <T> RootAggregateChange<T> createInsertChange(T instance) {

AggregateChangeWithRoot<T> aggregateChange = MutableAggregateChange.forSave(instance);
RootAggregateChange<T> aggregateChange = MutableAggregateChange.forSave(instance);
new RelationalEntityInsertWriter<T>(context).write(instance, aggregateChange);
return aggregateChange;
}

private <T> AggregateChangeWithRoot<T> createUpdateChange(EntityAndPreviousVersion<T> entityAndVersion) {
private <T> RootAggregateChange<T> createUpdateChange(EntityAndPreviousVersion<T> entityAndVersion) {

AggregateChangeWithRoot<T> aggregateChange = MutableAggregateChange.forSave(entityAndVersion.entity,
RootAggregateChange<T> aggregateChange = MutableAggregateChange.forSave(entityAndVersion.entity,
entityAndVersion.version);
new RelationalEntityUpdateWriter<T>(context).write(entityAndVersion.entity, aggregateChange);
new RelationalEntityUpdateWriter<T>(context).write(entityAndVersion.entity,
aggregateChange);
return aggregateChange;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ public interface DataAccessStrategy extends RelationResolver {
* {@link Map} or {@link List}.
* @return the id generated by the database if any.
* @since 1.1
* @deprecated since 2.4, use {@link #insert(Object, Class, Identifier, IdValueSource)}. This will no longer insert as
* expected when the id property of the instance is pre-populated.
* @deprecated since 2.4, use {@link #insert(Object, Class, Identifier, IdValueSource)}.
*/
@Nullable
@Deprecated
Expand Down
Loading