Skip to content

DATAJDBC-219 Implement optimistic record locking #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.BiConsumer;

Expand All @@ -33,6 +34,7 @@
import org.springframework.data.relational.core.conversion.DbAction;
import org.springframework.data.relational.core.conversion.Interpreter;
import org.springframework.data.relational.core.conversion.RelationalConverter;
import org.springframework.data.relational.core.conversion.RelationalEntityVersionUtils;
import org.springframework.data.relational.core.mapping.RelationalPersistentEntity;
import org.springframework.data.relational.core.mapping.RelationalPersistentProperty;
import org.springframework.data.util.Pair;
Expand All @@ -45,6 +47,7 @@
*
* @author Jens Schauder
* @author Mark Paluch
* @author Tyler Van Gorder
* @since 1.2
*/
class AggregateChangeExecutor {
Expand All @@ -60,7 +63,6 @@ class AggregateChangeExecutor {
this.context = converter.getMappingContext();
}

@SuppressWarnings("unchecked")
<T> void execute(AggregateChange<T> aggregateChange) {

List<DbAction<?>> actions = new ArrayList<>();
Expand All @@ -70,17 +72,42 @@ <T> void execute(AggregateChange<T> aggregateChange) {
actions.add(action);
});

T newRoot = (T) populateIdsIfNecessary(actions);

T newRoot = populateIdsIfNecessary(actions);
if (newRoot != null) {
newRoot = populateRootVersionIfNecessary(newRoot, actions);
aggregateChange.setEntity(newRoot);
}
}

@SuppressWarnings("unchecked")
@Nullable
private <T> T populateRootVersionIfNecessary(T newRoot, List<DbAction<?>> actions) {

// Does the root entity have a version attribute?
RelationalPersistentEntity<T> persistentEntity = (RelationalPersistentEntity<T>) context
.getRequiredPersistentEntity(newRoot.getClass());
if (!persistentEntity.hasVersionProperty()) {
return newRoot;
}

// Find the root action
Optional<DbAction<?>> rootAction = actions.parallelStream().filter(action -> action instanceof DbAction.WithVersion)
.findFirst();

if (!rootAction.isPresent()) {
// This really should never happen.
return newRoot;
}
DbAction.WithVersion<T> versionAction = (DbAction.WithVersion<T>) rootAction.get();

return RelationalEntityVersionUtils.setVersionNumberOnEntity(newRoot,
versionAction.getNextVersion(), persistentEntity, converter);
}

@Nullable
private Object populateIdsIfNecessary(List<DbAction<?>> actions) {
private <T> T populateIdsIfNecessary(List<DbAction<?>> actions) {

Object newRoot = null;
T newRoot = null;

// have the actions so that the inserts on the leaves come first.
List<DbAction<?>> reverseActions = new ArrayList<>(actions);
Expand All @@ -102,15 +129,15 @@ private Object populateIdsIfNecessary(List<DbAction<?>> actions) {
if (newEntity != ((DbAction.WithGeneratedId<?>) action).getEntity()) {

if (action instanceof DbAction.Insert) {
DbAction.Insert insert = (DbAction.Insert) action;
DbAction.Insert<?> insert = (DbAction.Insert<?>) action;

Pair qualifier = insert.getQualifier();
Pair<?, ?> qualifier = insert.getQualifier();

cascadingValues.stage(insert.getDependingOn(), insert.getPropertyPath(),
qualifier == null ? null : qualifier.getSecond(), newEntity);

} else if (action instanceof DbAction.InsertRoot) {
newRoot = newEntity;
newRoot = (T) newEntity;
}
}
}
Expand Down Expand Up @@ -140,7 +167,7 @@ private <S> Object setIdAndCascadingProperties(DbAction.WithGeneratedId<S> actio
}

@SuppressWarnings("unchecked")
private PersistentPropertyPath getRelativePath(DbAction action, PersistentPropertyPath pathToValue) {
private PersistentPropertyPath<?> getRelativePath(DbAction<?> action, PersistentPropertyPath<?> pathToValue) {

if (action instanceof DbAction.Insert) {
return pathToValue.getExtensionForBaseOf(((DbAction.Insert) action).getPropertyPath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import lombok.RequiredArgsConstructor;

import java.util.Collections;
import java.util.Map;

import org.springframework.dao.IncorrectUpdateSemanticsDataAccessException;
Expand All @@ -34,6 +33,8 @@
import org.springframework.data.relational.core.conversion.DbAction.Update;
import org.springframework.data.relational.core.conversion.DbAction.UpdateRoot;
import org.springframework.data.relational.core.conversion.Interpreter;
import org.springframework.data.relational.core.conversion.RelationalConverter;
import org.springframework.data.relational.core.conversion.RelationalEntityVersionUtils;
import org.springframework.data.relational.core.mapping.PersistentPropertyPathExtension;
import org.springframework.data.relational.core.mapping.RelationalMappingContext;
import org.springframework.data.relational.core.mapping.RelationalPersistentEntity;
Expand All @@ -48,11 +49,13 @@
* @author Jens Schauder
* @author Mark Paluch
* @author Myeonghyeon Lee
* @author Tyler Van Gorder
*/
@RequiredArgsConstructor
class DefaultJdbcInterpreter implements Interpreter {

public static final String UPDATE_FAILED = "Failed to update entity [%s]. Id [%s] not found in database.";
private final RelationalConverter converter;
private final RelationalMappingContext context;
private final DataAccessStrategy accessStrategy;

Expand All @@ -62,21 +65,40 @@ class DefaultJdbcInterpreter implements Interpreter {
*/
@Override
public <T> void interpret(Insert<T> insert) {

Object id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), getParentKeys(insert));

insert.setGeneratedId(id);
}

@SuppressWarnings("unchecked")
private <T> RelationalPersistentEntity<T> getRequiredPersistentEntity(Class<T> type) {
return (RelationalPersistentEntity<T>) context.getRequiredPersistentEntity(type);
}

/*
* (non-Javadoc)
* @see org.springframework.data.relational.core.conversion.Interpreter#interpret(org.springframework.data.relational.core.conversion.DbAction.InsertRoot)
*/
@Override
public <T> void interpret(InsertRoot<T> insert) {

Object id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), Collections.emptyMap());
insert.setGeneratedId(id);
RelationalPersistentEntity<T> persistentEntity = getRequiredPersistentEntity(insert.getEntityType());

if (persistentEntity.hasVersionProperty()) {
// The interpreter is responsible for setting the initial version on the entity prior to calling insert.
Number version = RelationalEntityVersionUtils.getVersionNumberFromEntity(insert.getEntity(), persistentEntity,
converter);
if (version != null && version.longValue() > 0) {
throw new IllegalArgumentException("The entity cannot be inserted because it already has a version.");
}
T rootEntity = RelationalEntityVersionUtils.setVersionNumberOnEntity(insert.getEntity(), 1, persistentEntity,
converter);
Object id = accessStrategy.insert(rootEntity, insert.getEntityType(), Identifier.empty());
insert.setNextVersion(1);
insert.setGeneratedId(id);
} else {
Object id = accessStrategy.insert(insert.getEntity(), insert.getEntityType(), Identifier.empty());
insert.setGeneratedId(id);
}
}

/*
Expand All @@ -100,10 +122,31 @@ public <T> void interpret(Update<T> update) {
@Override
public <T> void interpret(UpdateRoot<T> update) {

if (!accessStrategy.update(update.getEntity(), update.getEntityType())) {

throw new IncorrectUpdateSemanticsDataAccessException(
String.format(UPDATE_FAILED, update.getEntity(), getIdFrom(update)));
RelationalPersistentEntity<T> persistentEntity = getRequiredPersistentEntity(update.getEntityType());

if (persistentEntity.hasVersionProperty()) {
// If the root aggregate has a version property, increment it.
Number previousVersion = RelationalEntityVersionUtils.getVersionNumberFromEntity(update.getEntity(),
persistentEntity, converter);
Assert.notNull(previousVersion, "The root aggregate cannot be updated because the version property is null.");

T rootEntity = RelationalEntityVersionUtils.setVersionNumberOnEntity(update.getEntity(),
previousVersion.longValue() + 1, persistentEntity,
converter);

if (accessStrategy.updateWithVersion(rootEntity, update.getEntityType(), previousVersion)) {
// Successful update, set the in-memory version on the action.
update.setNextVersion(previousVersion);
} else {
throw new IncorrectUpdateSemanticsDataAccessException(
String.format(UPDATE_FAILED, update.getEntity(), getIdFrom(update)));
}
} else {
if (!accessStrategy.update(update.getEntity(), update.getEntityType())) {

throw new IncorrectUpdateSemanticsDataAccessException(
String.format(UPDATE_FAILED, update.getEntity(), getIdFrom(update)));
}
}
}

Expand Down Expand Up @@ -135,7 +178,16 @@ public <T> void interpret(Delete<T> delete) {
*/
@Override
public <T> void interpret(DeleteRoot<T> delete) {
accessStrategy.delete(delete.getRootId(), delete.getEntityType());

if (delete.getEntity() != null) {
RelationalPersistentEntity<T> persistentEntity = getRequiredPersistentEntity(delete.getEntityType());
if (persistentEntity.hasVersionProperty()) {
accessStrategy.deleteWithVersion(delete.getEntity(), delete.getEntityType());
return;
}
}

accessStrategy.delete(delete.getId(), delete.getEntityType());
}

/*
Expand Down Expand Up @@ -177,13 +229,13 @@ private Object getParentId(DbAction.WithDependingOn<?> action) {
PersistentPropertyPathExtension path = new PersistentPropertyPathExtension(context, action.getPropertyPath());
PersistentPropertyPathExtension idPath = path.getIdDefiningParentPath();

DbAction.WithEntity idOwningAction = getIdOwningAction(action, idPath);
DbAction.WithEntity<?> idOwningAction = getIdOwningAction(action, idPath);

return getIdFrom(idOwningAction);
}

@SuppressWarnings("unchecked")
private DbAction.WithEntity getIdOwningAction(DbAction.WithEntity action, PersistentPropertyPathExtension idPath) {
private DbAction.WithEntity<?> getIdOwningAction(DbAction.WithEntity<?> action,
PersistentPropertyPathExtension idPath) {

if (!(action instanceof DbAction.WithDependingOn)) {

Expand All @@ -193,7 +245,7 @@ private DbAction.WithEntity getIdOwningAction(DbAction.WithEntity action, Persis
return action;
}

DbAction.WithDependingOn withDependingOn = (DbAction.WithDependingOn) action;
DbAction.WithDependingOn<?> withDependingOn = (DbAction.WithDependingOn<?>) action;

if (idPath.matches(withDependingOn.getPropertyPath())) {
return action;
Expand All @@ -202,7 +254,7 @@ private DbAction.WithEntity getIdOwningAction(DbAction.WithEntity action, Persis
return getIdOwningAction(withDependingOn.getDependingOn(), idPath);
}

private Object getIdFrom(DbAction.WithEntity idOwningAction) {
private Object getIdFrom(DbAction.WithEntity<?> idOwningAction) {

if (idOwningAction instanceof DbAction.WithGeneratedId) {

Expand All @@ -221,4 +273,5 @@ private Object getIdFrom(DbAction.WithEntity idOwningAction) {

return identifier;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,20 @@
import org.springframework.data.relational.core.conversion.RelationalEntityDeleteWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityInsertWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityUpdateWriter;
import org.springframework.data.relational.core.conversion.RelationalEntityWriter;
import org.springframework.data.relational.core.mapping.RelationalMappingContext;
import org.springframework.data.relational.core.mapping.RelationalPersistentEntity;
import org.springframework.data.relational.core.mapping.event.*;
import org.springframework.data.relational.core.mapping.event.AfterDeleteCallback;
import org.springframework.data.relational.core.mapping.event.AfterDeleteEvent;
import org.springframework.data.relational.core.mapping.event.AfterLoadCallback;
import org.springframework.data.relational.core.mapping.event.AfterLoadEvent;
import org.springframework.data.relational.core.mapping.event.AfterSaveCallback;
import org.springframework.data.relational.core.mapping.event.AfterSaveEvent;
import org.springframework.data.relational.core.mapping.event.BeforeConvertCallback;
import org.springframework.data.relational.core.mapping.event.BeforeDeleteCallback;
import org.springframework.data.relational.core.mapping.event.BeforeDeleteEvent;
import org.springframework.data.relational.core.mapping.event.BeforeSaveCallback;
import org.springframework.data.relational.core.mapping.event.BeforeSaveEvent;
import org.springframework.data.relational.core.mapping.event.Identifier;
import org.springframework.data.relational.core.mapping.event.Identifier.Specified;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
Expand All @@ -51,10 +61,8 @@ public class JdbcAggregateTemplate implements JdbcAggregateOperations {

private final ApplicationEventPublisher publisher;
private final RelationalMappingContext context;
private final RelationalConverter converter;
private final Interpreter interpreter;

private final RelationalEntityWriter jdbcEntityWriter;
private final RelationalEntityDeleteWriter jdbcEntityDeleteWriter;
private final RelationalEntityInsertWriter jdbcEntityInsertWriter;
private final RelationalEntityUpdateWriter jdbcEntityUpdateWriter;
Expand Down Expand Up @@ -83,14 +91,12 @@ public JdbcAggregateTemplate(ApplicationContext publisher, RelationalMappingCont

this.publisher = publisher;
this.context = context;
this.converter = converter;
this.accessStrategy = dataAccessStrategy;

this.jdbcEntityWriter = new RelationalEntityWriter(context);
this.jdbcEntityInsertWriter = new RelationalEntityInsertWriter(context);
this.jdbcEntityUpdateWriter = new RelationalEntityUpdateWriter(context);
this.jdbcEntityDeleteWriter = new RelationalEntityDeleteWriter(context);
this.interpreter = new DefaultJdbcInterpreter(context, accessStrategy);
this.interpreter = new DefaultJdbcInterpreter(converter, context, accessStrategy);

this.executor = new AggregateChangeExecutor(interpreter, converter);

Expand All @@ -115,15 +121,12 @@ public JdbcAggregateTemplate(ApplicationEventPublisher publisher, RelationalMapp

this.publisher = publisher;
this.context = context;
this.converter = converter;
this.accessStrategy = dataAccessStrategy;

this.jdbcEntityWriter = new RelationalEntityWriter(context);
this.jdbcEntityInsertWriter = new RelationalEntityInsertWriter(context);
this.jdbcEntityUpdateWriter = new RelationalEntityUpdateWriter(context);
this.jdbcEntityDeleteWriter = new RelationalEntityDeleteWriter(context);
this.interpreter = new DefaultJdbcInterpreter(context, accessStrategy);

this.interpreter = new DefaultJdbcInterpreter(converter, context, accessStrategy);
this.executor = new AggregateChangeExecutor(interpreter, converter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ public <S> boolean update(S instance, Class<S> domainType) {
return collect(das -> das.update(instance, domainType));
}

/*
* (non-Javadoc)
* @see org.springframework.data.jdbc.core.DataAccessStrategy#updateWithVersion(java.lang.Object, java.lang.Class, java.lang.Number)
*/
@Override
public <S> boolean updateWithVersion(S instance, Class<S> domainType, Number previousVersion) {
return collect(das -> das.updateWithVersion(instance, domainType, previousVersion));
}

/*
* (non-Javadoc)
* @see org.springframework.data.jdbc.core.DataAccessStrategy#delete(java.lang.Object, java.lang.Class)
Expand All @@ -77,6 +86,15 @@ public void delete(Object id, Class<?> domainType) {
collectVoid(das -> das.delete(id, domainType));
}

/*
* (non-Javadoc)
* @see org.springframework.data.jdbc.core.DataAccessStrategy#deleteInstance(java.lang.Object, java.lang.Class)
*/
@Override
public <T> void deleteWithVersion(T instance, Class<T> domainType) {
collectVoid(das -> das.deleteWithVersion(instance, domainType));
}

/*
* (non-Javadoc)
* @see org.springframework.data.jdbc.core.DataAccessStrategy#delete(java.lang.Object, org.springframework.data.mapping.PersistentPropertyPath)
Expand Down
Loading