Skip to content

DATAES-771 - Add after-save entity callbacks support. #414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.event.AfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.BeforeConvertCallback;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
Expand All @@ -45,6 +46,7 @@
*
* @author Sascha Woo
* @author Peter-Josef Meisch
* @author Roman Puchkovskiy
*/
public abstract class AbstractElasticsearchTemplate implements ElasticsearchOperations, ApplicationContextAware {

Expand Down Expand Up @@ -117,8 +119,13 @@ public <T> T save(T entity, IndexCoordinates index) {
Assert.notNull(entity, "entity must not be null");
Assert.notNull(index, "index must not be null");

index(getIndexQuery(entity), index);
return entity;
IndexQuery query = getIndexQuery(entity);
index(query, index);

// suppressing because it's either entity itself or something of a correct type returned by an entity callback
@SuppressWarnings("unchecked")
T castResult = (T) query.getObject();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must not be null, see my comment for the maybeCallbackAfterSave(T entity) method.

Copy link
Contributor Author

@rpuch rpuch Mar 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

query.getObject() can be null either if entity is null (which is impossible, there is an assertion in the beginning of the method), or if entityCallbacks.callback() returns null (which is also impossible).
So the castResult cannot be null.

Would you still prefer to assert that it is not null, just in case?

return castResult;
}

@Override
Expand Down Expand Up @@ -151,7 +158,10 @@ public <T> Iterable<T> save(Iterable<T> entities, IndexCoordinates index) {
});
}

return entities;
return indexQueries.stream()
.map(IndexQuery::getObject)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must not be null, see my comment for the maybeCallbackAfterSave(T entity) method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, this can only be null if some entity in the original Iterable is null.

Do you still suggest adding a non-null assertion?

.map(entity -> (T) entity)
.collect(Collectors.toList());
}

@Override
Expand Down Expand Up @@ -455,11 +465,39 @@ protected void maybeCallbackBeforeConvertWithQuery(Object query) {
}

// this can be called with either a List<IndexQuery> or a List<UpdateQuery>; these query classes
// don't have a common bas class, therefore the List<?> argument
// don't have a common base class, therefore the List<?> argument
protected void maybeCallbackBeforeConvertWithQueries(List<?> queries) {
queries.forEach(this::maybeCallbackBeforeConvertWithQuery);
}

protected <T> T maybeCallbackAfterSave(T entity) {

if (entityCallbacks != null) {
return entityCallbacks.callback(AfterSaveCallback.class, entity);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make sure that the value returned from the callback is not null. Although the API has neither the parameter or return value marked as nullable, we might get a null value by some bad implemention of an after-save callback. So I think we should assert this here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the code of SimpleEntityCallbackInvoker, it looke like it makes sure that the value returned by a callback is non-null:

				throw new IllegalArgumentException(
						String.format("Callback invocation on %s returned null value for %s", callback.getClass(), entity));

Also, javadoc for EntityCallbackInvoker#invokeCallback() says that the returned value is 'never null'.
So even a callback that returns null will just cause an exception somewhere in EntityCallbacks implementation, and here we will not get a null value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I didn't dig this deep into the code for this PR, sorry (wish this were Kotlin stuff!), so that's ok

}

return entity;
}

protected void maybeCallbackAfterSaveWithQuery(Object query) {

if (query instanceof IndexQuery) {
IndexQuery indexQuery = (IndexQuery) query;
Object queryObject = indexQuery.getObject();

if (queryObject != null) {
queryObject = maybeCallbackAfterSave(queryObject);
indexQuery.setObject(queryObject);
}
}
}

// this can be called with either a List<IndexQuery> or a List<UpdateQuery>; these query classes
// don't have a common base class, therefore the List<?> argument
protected void maybeCallbackAfterSaveWithQueries(List<?> queries) {
queries.forEach(this::maybeCallbackAfterSaveWithQuery);
}

// endregion

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ public String index(IndexQuery query, IndexCoordinates index) {
if (queryObject != null) {
setPersistentEntityId(queryObject, documentId);
}

maybeCallbackAfterSaveWithQuery(query);

return documentId;
}

Expand Down Expand Up @@ -226,7 +229,10 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
maybeCallbackBeforeConvertWithQueries(queries);
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
return checkForBulkOperationFailure(execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
List<String> ids = checkForBulkOperationFailure(execute(
client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
maybeCallbackAfterSaveWithQueries(queries);
return ids;
}
// endregion

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
import org.elasticsearch.action.search.MultiSearchResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
Expand Down Expand Up @@ -79,6 +77,7 @@
* @author Martin Choraine
* @author Farid Azaza
* @author Gyula Attila Csorogi
* @author Roman Puchkovskiy
* @deprecated as of 4.0
*/
@Deprecated
Expand Down Expand Up @@ -153,6 +152,8 @@ public String index(IndexQuery query, IndexCoordinates index) {
setPersistentEntityId(queryObject, documentId);
}

maybeCallbackAfterSaveWithQuery(query);

return documentId;
}

Expand Down Expand Up @@ -188,7 +189,11 @@ public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions,
Assert.notNull(queries, "List of IndexQuery must not be null");
Assert.notNull(bulkOptions, "BulkOptions must not be null");

return doBulkOperation(queries, bulkOptions, index);
List<String> ids = doBulkOperation(queries, bulkOptions, index);

maybeCallbackAfterSaveWithQueries(queries);

return ids;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
Expand Down Expand Up @@ -97,6 +98,7 @@
* @author Peter-Josef Meisch
* @author Mathias Teier
* @author Aleksei Arsenev
* @author Roman Puchkovskiy
* @since 3.2
*/
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
Expand Down Expand Up @@ -185,7 +187,8 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
return doIndex(entity, adaptableEntity, index) //
.map(it -> {
return adaptableEntity.populateIdIfNecessary(it.getId());
});
})
.flatMap(this::maybeCallAfterSave);
}

@Override
Expand Down Expand Up @@ -213,11 +216,11 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, Ind
.map(e -> getIndexQuery(e.getBean(), e)) //
.collect(Collectors.toList());
return doBulkOperation(indexRequests, BulkOptions.defaultOptions(), index) //
.map(bulkItemResponse -> {
.flatMap(bulkItemResponse -> {

AdaptibleEntity<? extends T> mappedEntity = iterator.next();
mappedEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
return mappedEntity.getBean();
return maybeCallAfterSave(mappedEntity.getBean());
});
});
}
Expand Down Expand Up @@ -882,5 +885,14 @@ protected <T> Mono<T> maybeCallBeforeConvert(T entity) {

return Mono.just(entity);
}

protected <T> Mono<T> maybeCallAfterSave(T entity) {

if (null != entityCallbacks) {
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity);
}

return Mono.just(entity);
}
// endregion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.event;

import org.springframework.data.mapping.callback.EntityCallback;
import org.springframework.data.mapping.callback.EntityCallbacks;

/**
* Entity callback triggered after save of an entity.
*
* @author Roman Puchkovskiy
* @since 4.0
* @see EntityCallbacks
*/
@FunctionalInterface
public interface AfterSaveCallback<T> extends EntityCallback<T> {

/**
* Entity callback method invoked after a domain object is saved. Can return either the same or a modified
* instance of the domain object.
*
* @param entity the domain object that was saved.
* @return the domain object that was persisted.
*/
T onAfterSave(T entity);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.event;

import org.reactivestreams.Publisher;
import org.springframework.data.mapping.callback.EntityCallback;
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;

/**
* Entity callback triggered after save of an entity.
*
* @author Roman Puchkovskiy
* @since 4.0
* @see ReactiveEntityCallbacks
*/
@FunctionalInterface
public interface ReactiveAfterSaveCallback<T> extends EntityCallback<T> {

/**
* Entity callback method invoked after a domain object is saved. Can return either the same or a modified
* instance of the domain object.
*
* @param entity the domain object that was saved.
* @return a {@link Publisher} emitting the domain object to be returned to the caller.
*/
Publisher<T> onAfterSave(T entity);
}
Loading