From 7709166b2e782fe7876da2440fd8ece3d1fb62c8 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Wed, 11 Oct 2023 21:40:45 +0200 Subject: [PATCH] Enhance refresh policy handling --- .../elasticsearch/elasticsearch-new.adoc | 1 + .../core/AbstractElasticsearchTemplate.java | 10 +- ...AbstractReactiveElasticsearchTemplate.java | 10 +- .../core/ElasticsearchOperations.java | 15 ++- .../core/ReactiveElasticsearchOperations.java | 11 +- .../repository/ElasticsearchRepository.java | 37 ++++++ .../ReactiveElasticsearchRepository.java | 61 +++++++++- .../SimpleElasticsearchRepository.java | 105 ++++++++++++++++- ...SimpleReactiveElasticsearchRepository.java | 109 +++++++++++++++++- 9 files changed, 344 insertions(+), 15 deletions(-) diff --git a/src/main/antora/modules/ROOT/pages/elasticsearch/elasticsearch-new.adoc b/src/main/antora/modules/ROOT/pages/elasticsearch/elasticsearch-new.adoc index 1f14c388be..7347804b85 100644 --- a/src/main/antora/modules/ROOT/pages/elasticsearch/elasticsearch-new.adoc +++ b/src/main/antora/modules/ROOT/pages/elasticsearch/elasticsearch-new.adoc @@ -12,6 +12,7 @@ * Enable MultiField annotation on property getter * Support nested sort option * Improved scripted und runtime field support +* Improved refresh policy support [[new-features.5-1-0]] == New in Spring Data Elasticsearch 5.1 diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index 1460652acb..4d19d96f94 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -855,7 +855,7 @@ public SearchScrollHits doWith(SearchDocumentResponse response) { } // endregion - // region routing + // region customization private void setRoutingResolver(RoutingResolver routingResolver) { Assert.notNull(routingResolver, "routingResolver must not be null"); @@ -873,5 +873,13 @@ public ElasticsearchOperations withRouting(RoutingResolver routingResolver) { return copy; } + @Override + public ElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) { + + var copy = copy(); + copy.setRefreshPolicy(refreshPolicy); + return copy; + } + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java index 7cfbad7b7c..6c81dfbee8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java @@ -186,7 +186,7 @@ public Mono logVersions() { // endregion - // region routing + // region customizations private void setRoutingResolver(RoutingResolver routingResolver) { Assert.notNull(routingResolver, "routingResolver must not be null"); @@ -203,6 +203,14 @@ public ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolv copy.setRoutingResolver(routingResolver); return copy; } + + @Override + public ReactiveElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy) { + AbstractReactiveElasticsearchTemplate copy = copy(); + copy.setRefreshPolicy(refreshPolicy); + return copy; + } + // endregion // region DocumentOperations diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index 51cc283a4d..be35f840dc 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -15,8 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import java.util.Objects; - import org.springframework.data.elasticsearch.core.cluster.ClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -91,15 +89,24 @@ default String convertId(@Nullable Object idValue) { } // endregion - // region routing + // region customizations /** * Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to * obtain routing information. * * @param routingResolver the {@link RoutingResolver} value, must not be {@literal null}. - * @return DocumentOperations instance + * @return {@link ElasticsearchOperations} instance * @since 4.2 */ ElasticsearchOperations withRouting(RoutingResolver routingResolver); + + /** + * Returns a copy of this instance with the same configuration, but that uses a different {@link RefreshPolicy}. + * + * @param refreshPolicy the {@link RefreshPolicy} value. + * @return {@link ElasticsearchOperations} instance. + * @since 5.2 + */ + ElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy); // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java index 0927fb4acc..e9304a5f06 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java @@ -77,7 +77,7 @@ public interface ReactiveElasticsearchOperations */ ReactiveClusterOperations cluster(); - // region routing + // region customizations /** * Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to * obtain routing information. @@ -86,5 +86,14 @@ public interface ReactiveElasticsearchOperations * @return DocumentOperations instance */ ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolver); + + /** + * Returns a copy of this instance with the same configuration, but that uses a different {@link RefreshPolicy}. + * + * @param refreshPolicy the {@link RefreshPolicy} value. + * @return {@link ReactiveElasticsearchOperations} instance. + * @since 5.2 + */ + ReactiveElasticsearchOperations withRefreshPolicy(@Nullable RefreshPolicy refreshPolicy); // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/ElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/ElasticsearchRepository.java index 01e55ea103..fb7b83c8a9 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/ElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/ElasticsearchRepository.java @@ -17,6 +17,7 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.repository.CrudRepository; import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.PagingAndSortingRepository; @@ -31,6 +32,7 @@ * @author Murali Chevuri * @author Peter-Josef Meisch */ +@SuppressWarnings("unused") @NoRepositoryBean public interface ElasticsearchRepository extends PagingAndSortingRepository, CrudRepository { @@ -43,4 +45,39 @@ public interface ElasticsearchRepository extends PagingAndSortingReposito * @return */ Page searchSimilar(T entity, @Nullable String[] fields, Pageable pageable); + + /** + * @since 5.2 + */ + S save(S entity, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Iterable saveAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + void deleteById(ID id, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + void delete(T entity, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + void deleteAllById(Iterable ids, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + void deleteAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + void deleteAll(@Nullable RefreshPolicy refreshPolicy); } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/ReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/ReactiveElasticsearchRepository.java index a94d626dc9..f6ad0125fd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/ReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/ReactiveElasticsearchRepository.java @@ -1,7 +1,7 @@ /* * Copyright 2019-2023 the original author or authors. * - * Licensed under the Apache License, Version 2.0 (the "License"); + * Licensed under the Apache License, Version 2.0 (the "License", @Nullable RefreshPolicy refreshPolicy); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -15,9 +15,15 @@ */ package org.springframework.data.elasticsearch.repository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.reactivestreams.Publisher; +import org.springframework.data.elasticsearch.core.RefreshPolicy; import org.springframework.data.repository.NoRepositoryBean; import org.springframework.data.repository.reactive.ReactiveCrudRepository; import org.springframework.data.repository.reactive.ReactiveSortingRepository; +import org.springframework.lang.Nullable; /** * Elasticsearch specific {@link org.springframework.data.repository.Repository} interface with reactive support. @@ -25,6 +31,57 @@ * @author Christoph Strobl * @since 3.2 */ +@SuppressWarnings("unused") @NoRepositoryBean public interface ReactiveElasticsearchRepository - extends ReactiveSortingRepository, ReactiveCrudRepository {} + extends ReactiveSortingRepository, ReactiveCrudRepository { + /** + * @since 5.2 + */ + Mono save(S entity, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Flux saveAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Flux saveAll(Publisher entityStream, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono deleteById(ID id, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono deleteById(Publisher id, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono delete(T entity, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono deleteAllById(Iterable ids, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono deleteAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono deleteAll(Publisher entityStream, @Nullable RefreshPolicy refreshPolicy); + + /** + * @since 5.2 + */ + Mono deleteAll(@Nullable RefreshPolicy refreshPolicy); +} diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java index e6673a76df..87cad402c1 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleElasticsearchRepository.java @@ -21,6 +21,7 @@ import java.util.Optional; import java.util.stream.Collectors; +import org.jetbrains.annotations.NotNull; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; @@ -177,10 +178,19 @@ public S save(S entity) { Assert.notNull(entity, "Cannot save 'null' entity."); - // noinspection ConstantConditions + // noinspection DataFlowIssue return executeAndRefresh(operations -> operations.save(entity, getIndexCoordinates())); } + @Override + public S save(S entity, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entity, "entity must not be null"); + + // noinspection DataFlowIssue + return executeAndRefresh(operations -> operations.save(entity, getIndexCoordinates()), refreshPolicy); + } + public List save(List entities) { Assert.notNull(entities, "Cannot insert 'null' as a List."); @@ -188,6 +198,13 @@ public List save(List entities) { return Streamable.of(saveAll(entities)).stream().collect(Collectors.toList()); } + public List save(List entities, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entities, "Cannot insert 'null' as a List."); + + return Streamable.of(saveAll(entities, refreshPolicy)).stream().collect(Collectors.toList()); + } + @Override public Iterable saveAll(Iterable entities) { @@ -199,6 +216,16 @@ public Iterable saveAll(Iterable entities) { return entities; } + @Override + public Iterable saveAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy) { + Assert.notNull(entities, "Cannot insert 'null' as a List."); + + IndexCoordinates indexCoordinates = getIndexCoordinates(); + executeAndRefresh(operations -> operations.save(entities, indexCoordinates), refreshPolicy); + + return entities; + } + @Override public boolean existsById(ID id) { // noinspection ConstantConditions @@ -233,6 +260,14 @@ public void deleteById(ID id) { doDelete(id, getIndexCoordinates()); } + @Override + public void deleteById(ID id, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(id, "Cannot delete entity with id 'null'."); + + doDelete(id, getIndexCoordinates(), refreshPolicy); + } + @Override public void delete(T entity) { @@ -241,9 +276,40 @@ public void delete(T entity) { doDelete(extractIdFromBean(entity), getIndexCoordinates()); } + @Override + public void delete(T entity, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entity, "Cannot delete 'null' entity."); + + doDelete(extractIdFromBean(entity), getIndexCoordinates(), refreshPolicy); + } + @Override public void deleteAllById(Iterable ids) { + // noinspection DuplicatedCode + Assert.notNull(ids, "Cannot delete 'null' list."); + + List idStrings = new ArrayList<>(); + for (ID id : ids) { + idStrings.add(stringIdRepresentation(id)); + } + + if (idStrings.isEmpty()) { + return; + } + + Query query = operations.idsQuery(idStrings); + executeAndRefresh((OperationsCallback) operations -> { + operations.delete(query, entityClass, getIndexCoordinates()); + return null; + }); + } + + @Override + public void deleteAllById(Iterable ids, @Nullable RefreshPolicy refreshPolicy) { + + // noinspection DuplicatedCode Assert.notNull(ids, "Cannot delete 'null' list."); List idStrings = new ArrayList<>(); @@ -264,7 +330,16 @@ public void deleteAllById(Iterable ids) { @Override public void deleteAll(Iterable entities) { + deleteAllById(getEntityIds(entities)); + } + + @Override + public void deleteAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy) { + deleteAllById(getEntityIds(entities), refreshPolicy); + } + @NotNull + private List getEntityIds(Iterable entities) { Assert.notNull(entities, "Cannot delete 'null' list."); List ids = new ArrayList<>(); @@ -274,8 +349,7 @@ public void deleteAll(Iterable entities) { ids.add(id); } } - - deleteAllById(ids); + return ids; } private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates) { @@ -285,16 +359,30 @@ private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates) { } } + private void doDelete(@Nullable ID id, IndexCoordinates indexCoordinates, @Nullable RefreshPolicy refreshPolicy) { + + if (id != null) { + executeAndRefresh(operations -> operations.delete(stringIdRepresentation(id), indexCoordinates), refreshPolicy); + } + } + @Override public void deleteAll() { - IndexCoordinates indexCoordinates = getIndexCoordinates(); executeAndRefresh((OperationsCallback) operations -> { - operations.delete(Query.findAll(), entityClass, indexCoordinates); + operations.delete(Query.findAll(), entityClass, getIndexCoordinates()); return null; }); } + @Override + public void deleteAll(@Nullable RefreshPolicy refreshPolicy) { + executeAndRefresh((OperationsCallback) operations -> { + operations.delete(Query.findAll(), entityClass, getIndexCoordinates()); + return null; + }, refreshPolicy); + } + private void doRefresh() { RefreshPolicy refreshPolicy = null; @@ -352,5 +440,12 @@ public R executeAndRefresh(OperationsCallback callback) { doRefresh(); return result; } + + @Nullable + public R executeAndRefresh(OperationsCallback callback, @Nullable RefreshPolicy refreshPolicy) { + R result = callback.doWithOperations(operations.withRefreshPolicy(refreshPolicy)); + doRefresh(); + return result; + } // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java index 175a33da8e..ccfdd4f669 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepository.java @@ -34,6 +34,7 @@ import org.springframework.data.elasticsearch.core.query.BaseQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.repository.ReactiveElasticsearchRepository; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; /** @@ -96,13 +97,31 @@ public Mono save(S entity) { .flatMap(saved -> doRefresh().thenReturn(saved)); } + @Override + public Mono save(S entity, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entity, "Entity must not be null!"); + + return operations.withRefreshPolicy(refreshPolicy).save(entity, entityInformation.getIndexCoordinates()) + .flatMap(saved -> doRefresh().thenReturn(saved)); + } + @Override public Flux saveAll(Iterable entities) { Assert.notNull(entities, "Entities must not be null!"); + return saveAll(Flux.fromIterable(entities)); } + @Override + public Flux saveAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entities, "Entities must not be null!"); + + return saveAll(Flux.fromIterable(entities), refreshPolicy); + } + @Override public Flux saveAll(Publisher entityStream) { @@ -112,6 +131,16 @@ public Flux saveAll(Publisher entityStream) { .concatWith(doRefresh().then(Mono.empty())); } + @Override + public Flux saveAll(Publisher entityStream, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entityStream, "EntityStream must not be null!"); + + return operations.withRefreshPolicy(refreshPolicy) + .save(Flux.from(entityStream), entityInformation.getIndexCoordinates()) + .concatWith(doRefresh().then(Mono.empty())); + } + @Override public Mono findById(ID id) { @@ -192,25 +221,54 @@ public Mono count() { public Mono deleteById(ID id) { Assert.notNull(id, "Id must not be null!"); + return operations.delete(convertId(id), entityInformation.getIndexCoordinates()) // .then(doRefresh()); } + @Override + public Mono deleteById(ID id, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(id, "Id must not be null!"); + + return operations.withRefreshPolicy(refreshPolicy).delete(convertId(id), entityInformation.getIndexCoordinates()) // + .then(doRefresh()); + } + @Override public Mono deleteById(Publisher id) { Assert.notNull(id, "Id must not be null!"); + return Mono.from(id).flatMap(this::deleteById); } + @Override + public Mono deleteById(Publisher id, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(id, "Id must not be null!"); + + return Mono.from(id).flatMap(id2 -> deleteById(id, refreshPolicy)); + } + @Override public Mono delete(T entity) { Assert.notNull(entity, "Entity must not be null!"); + return operations.delete(entity, entityInformation.getIndexCoordinates()) // .then(doRefresh()); } + @Override + public Mono delete(T entity, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entity, "Entity must not be null!"); + + return operations.withRefreshPolicy(refreshPolicy).delete(entity, entityInformation.getIndexCoordinates()) // + .then(doRefresh()); + } + @Override public Mono deleteAllById(Iterable ids) { @@ -225,17 +283,43 @@ public Mono deleteAllById(Iterable ids) { .then(doRefresh()); } + @Override + public Mono deleteAllById(Iterable ids, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(ids, "Ids must not be null!"); + + var operationsWithRefreshPolicy = operations.withRefreshPolicy(refreshPolicy); + return Flux.fromIterable(ids) // + .map(this::convertId) // + .collectList() // + .map(operations::idsQuery) // + .flatMap( + query -> operationsWithRefreshPolicy.delete(query, entityInformation.getJavaType(), + entityInformation.getIndexCoordinates())) // + .then(doRefresh()); + } + @Override public Mono deleteAll(Iterable entities) { Assert.notNull(entities, "Entities must not be null!"); + return deleteAll(Flux.fromIterable(entities)); } + @Override + public Mono deleteAll(Iterable entities, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entities, "Entities must not be null!"); + + return deleteAll(Flux.fromIterable(entities), refreshPolicy); + } + @Override public Mono deleteAll(Publisher entityStream) { Assert.notNull(entityStream, "EntityStream must not be null!"); + return Flux.from(entityStream) // .map(entityInformation::getRequiredId) // .map(this::convertId) // @@ -247,12 +331,35 @@ public Mono deleteAll(Publisher entityStream) { } @Override - public Mono deleteAll() { + public Mono deleteAll(Publisher entityStream, @Nullable RefreshPolicy refreshPolicy) { + + Assert.notNull(entityStream, "EntityStream must not be null!"); + + var operationsWithRefreshPolicy = operations.withRefreshPolicy(refreshPolicy); + return Flux.from(entityStream) // + .map(entityInformation::getRequiredId) // + .map(this::convertId) // + .collectList() // + .map(operations::idsQuery) + .flatMap( + query -> operationsWithRefreshPolicy.delete(query, entityInformation.getJavaType(), + entityInformation.getIndexCoordinates())) // + .then(doRefresh()); + } + @Override + public Mono deleteAll() { return operations.delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) // .then(doRefresh()); } + @Override + public Mono deleteAll(@Nullable RefreshPolicy refreshPolicy) { + return operations.withRefreshPolicy(refreshPolicy) + .delete(Query.findAll(), entityInformation.getJavaType(), entityInformation.getIndexCoordinates()) // + .then(doRefresh()); + } + private String convertId(Object id) { return operations.getElasticsearchConverter().convertId(id); }