Skip to content

Commit df74883

Browse files
committed
DATAES-771 - Add after-save entity callbacks support.
This change enables to get callbacks when an entity gets saved to Elasticsearch.
1 parent 2ec61ab commit df74883

9 files changed

+944
-11
lines changed

Diff for: src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

+42-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
2323
import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter;
2424
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
25+
import org.springframework.data.elasticsearch.core.event.AfterSaveCallback;
2526
import org.springframework.data.elasticsearch.core.event.BeforeConvertCallback;
2627
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
2728
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
@@ -45,6 +46,7 @@
4546
*
4647
* @author Sascha Woo
4748
* @author Peter-Josef Meisch
49+
* @author Roman Puchkovskiy
4850
*/
4951
public abstract class AbstractElasticsearchTemplate implements ElasticsearchOperations, ApplicationContextAware {
5052

@@ -117,8 +119,13 @@ public <T> T save(T entity, IndexCoordinates index) {
117119
Assert.notNull(entity, "entity must not be null");
118120
Assert.notNull(index, "index must not be null");
119121

120-
index(getIndexQuery(entity), index);
121-
return entity;
122+
IndexQuery query = getIndexQuery(entity);
123+
index(query, index);
124+
125+
// suppressing because it's either entity itself or something of a correct type returned by an entity callback
126+
@SuppressWarnings("unchecked")
127+
T castResult = (T) query.getObject();
128+
return castResult;
122129
}
123130

124131
@Override
@@ -151,7 +158,10 @@ public <T> Iterable<T> save(Iterable<T> entities, IndexCoordinates index) {
151158
});
152159
}
153160

154-
return entities;
161+
return indexQueries.stream()
162+
.map(IndexQuery::getObject)
163+
.map(entity -> (T) entity)
164+
.collect(Collectors.toList());
155165
}
156166

157167
@Override
@@ -455,11 +465,39 @@ protected void maybeCallbackBeforeConvertWithQuery(Object query) {
455465
}
456466

457467
// this can be called with either a List<IndexQuery> or a List<UpdateQuery>; these query classes
458-
// don't have a common bas class, therefore the List<?> argument
468+
// don't have a common base class, therefore the List<?> argument
459469
protected void maybeCallbackBeforeConvertWithQueries(List<?> queries) {
460470
queries.forEach(this::maybeCallbackBeforeConvertWithQuery);
461471
}
462472

473+
protected <T> T maybeCallbackAfterSave(T entity) {
474+
475+
if (entityCallbacks != null) {
476+
return entityCallbacks.callback(AfterSaveCallback.class, entity);
477+
}
478+
479+
return entity;
480+
}
481+
482+
protected void maybeCallbackAfterSaveWithQuery(Object query) {
483+
484+
if (query instanceof IndexQuery) {
485+
IndexQuery indexQuery = (IndexQuery) query;
486+
Object queryObject = indexQuery.getObject();
487+
488+
if (queryObject != null) {
489+
queryObject = maybeCallbackAfterSave(queryObject);
490+
indexQuery.setObject(queryObject);
491+
}
492+
}
493+
}
494+
495+
// this can be called with either a List<IndexQuery> or a List<UpdateQuery>; these query classes
496+
// don't have a common base class, therefore the List<?> argument
497+
protected void maybeCallbackAfterSaveWithQueries(List<?> queries) {
498+
queries.forEach(this::maybeCallbackAfterSaveWithQuery);
499+
}
500+
463501
// endregion
464502

465503
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ public String index(IndexQuery query, IndexCoordinates index) {
145145
if (queryObject != null) {
146146
setPersistentEntityId(queryObject, documentId);
147147
}
148+
149+
maybeCallbackAfterSaveWithQuery(query);
150+
148151
return documentId;
149152
}
150153

@@ -226,7 +229,10 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
226229
private List<String> doBulkOperation(List<?> queries, BulkOptions bulkOptions, IndexCoordinates index) {
227230
maybeCallbackBeforeConvertWithQueries(queries);
228231
BulkRequest bulkRequest = requestFactory.bulkRequest(queries, bulkOptions, index);
229-
return checkForBulkOperationFailure(execute(client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
232+
List<String> ids = checkForBulkOperationFailure(execute(
233+
client -> client.bulk(bulkRequest, RequestOptions.DEFAULT)));
234+
maybeCallbackAfterSaveWithQueries(queries);
235+
return ids;
230236
}
231237
// endregion
232238

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,12 @@
3131
import org.elasticsearch.action.search.MultiSearchResponse;
3232
import org.elasticsearch.action.search.SearchRequestBuilder;
3333
import org.elasticsearch.action.search.SearchResponse;
34-
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
3534
import org.elasticsearch.action.update.UpdateRequestBuilder;
3635
import org.elasticsearch.client.Client;
3736
import org.elasticsearch.common.unit.TimeValue;
3837
import org.elasticsearch.search.suggest.SuggestBuilder;
3938
import org.slf4j.Logger;
4039
import org.slf4j.LoggerFactory;
41-
import org.springframework.data.domain.Pageable;
4240
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
4341
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4442
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
@@ -79,6 +77,7 @@
7977
* @author Martin Choraine
8078
* @author Farid Azaza
8179
* @author Gyula Attila Csorogi
80+
* @author Roman Puchkovskiy
8281
* @deprecated as of 4.0
8382
*/
8483
@Deprecated
@@ -153,6 +152,8 @@ public String index(IndexQuery query, IndexCoordinates index) {
153152
setPersistentEntityId(queryObject, documentId);
154153
}
155154

155+
maybeCallbackAfterSaveWithQuery(query);
156+
156157
return documentId;
157158
}
158159

@@ -188,7 +189,11 @@ public List<String> bulkIndex(List<IndexQuery> queries, BulkOptions bulkOptions,
188189
Assert.notNull(queries, "List of IndexQuery must not be null");
189190
Assert.notNull(bulkOptions, "BulkOptions must not be null");
190191

191-
return doBulkOperation(queries, bulkOptions, index);
192+
List<String> ids = doBulkOperation(queries, bulkOptions, index);
193+
194+
maybeCallbackAfterSaveWithQueries(queries);
195+
196+
return ids;
192197
}
193198

194199
@Override

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
import org.springframework.data.elasticsearch.core.document.Document;
7171
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
7272
import org.springframework.data.elasticsearch.core.document.SearchDocument;
73+
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
7374
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
7475
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
7576
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
@@ -97,6 +98,7 @@
9798
* @author Peter-Josef Meisch
9899
* @author Mathias Teier
99100
* @author Aleksei Arsenev
101+
* @author Roman Puchkovskiy
100102
* @since 3.2
101103
*/
102104
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
@@ -185,7 +187,8 @@ public <T> Mono<T> save(T entity, IndexCoordinates index) {
185187
return doIndex(entity, adaptableEntity, index) //
186188
.map(it -> {
187189
return adaptableEntity.populateIdIfNecessary(it.getId());
188-
});
190+
})
191+
.flatMap(this::maybeCallAfterSave);
189192
}
190193

191194
@Override
@@ -213,11 +216,11 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entities, Ind
213216
.map(e -> getIndexQuery(e.getBean(), e)) //
214217
.collect(Collectors.toList());
215218
return doBulkOperation(indexRequests, BulkOptions.defaultOptions(), index) //
216-
.map(bulkItemResponse -> {
219+
.flatMap(bulkItemResponse -> {
217220

218221
AdaptibleEntity<? extends T> mappedEntity = iterator.next();
219222
mappedEntity.populateIdIfNecessary(bulkItemResponse.getResponse().getId());
220-
return mappedEntity.getBean();
223+
return maybeCallAfterSave(mappedEntity.getBean());
221224
});
222225
});
223226
}
@@ -882,5 +885,14 @@ protected <T> Mono<T> maybeCallBeforeConvert(T entity) {
882885

883886
return Mono.just(entity);
884887
}
888+
889+
protected <T> Mono<T> maybeCallAfterSave(T entity) {
890+
891+
if (null != entityCallbacks) {
892+
return entityCallbacks.callback(ReactiveAfterSaveCallback.class, entity);
893+
}
894+
895+
return Mono.just(entity);
896+
}
885897
// endregion
886898
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.core.event;
17+
18+
import org.springframework.data.mapping.callback.EntityCallback;
19+
import org.springframework.data.mapping.callback.EntityCallbacks;
20+
21+
/**
22+
* Entity callback triggered after save of an entity.
23+
*
24+
* @author Roman Puchkovskiy
25+
* @since 4.0
26+
* @see EntityCallbacks
27+
*/
28+
@FunctionalInterface
29+
public interface AfterSaveCallback<T> extends EntityCallback<T> {
30+
31+
/**
32+
* Entity callback method invoked after a domain object is saved. Can return either the same or a modified
33+
* instance of the domain object.
34+
*
35+
* @param entity the domain object that was saved.
36+
* @return the domain object that was persisted.
37+
*/
38+
T onAfterSave(T entity);
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.elasticsearch.core.event;
17+
18+
import org.reactivestreams.Publisher;
19+
import org.springframework.data.mapping.callback.EntityCallback;
20+
import org.springframework.data.mapping.callback.ReactiveEntityCallbacks;
21+
22+
/**
23+
* Entity callback triggered after save of an entity.
24+
*
25+
* @author Roman Puchkovskiy
26+
* @since 4.0
27+
* @see ReactiveEntityCallbacks
28+
*/
29+
@FunctionalInterface
30+
public interface ReactiveAfterSaveCallback<T> extends EntityCallback<T> {
31+
32+
/**
33+
* Entity callback method invoked after a domain object is saved. Can return either the same or a modified
34+
* instance of the domain object.
35+
*
36+
* @param entity the domain object that was saved.
37+
* @return a {@link Publisher} emitting the domain object to be returned to the caller.
38+
*/
39+
Publisher<T> onAfterSave(T entity);
40+
}

0 commit comments

Comments
 (0)