From 5e0106d8d15905e0045d80da58e060bc8a0ddd5c Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 13 Feb 2021 12:45:53 +0100 Subject: [PATCH] Improve delete-by-query returned information. --- ...elasticsearch-migration-guide-4.1-4.2.adoc | 6 +++ .../DefaultReactiveElasticsearchClient.java | 6 +-- .../reactive/ReactiveElasticsearchClient.java | 8 +-- .../core/AbstractElasticsearchTemplate.java | 5 +- .../core/DocumentOperations.java | 12 +++-- .../core/ElasticsearchRestTemplate.java | 10 ++-- .../core/ElasticsearchTemplate.java | 10 ++-- .../core/ReactiveDocumentOperations.java | 8 +-- .../core/ReactiveElasticsearchTemplate.java | 14 +++--- ...ueryResponse.java => ByQueryResponse.java} | 50 +++++++++---------- ...tReactiveElasticsearchRepositoryQuery.java | 4 +- ...veElasticsearchClientIntegrationTests.java | 6 +-- ...ElasticsearchTemplateIntegrationTests.java | 8 ++- ...eReactiveElasticsearchRepositoryTests.java | 1 + 14 files changed, 83 insertions(+), 65 deletions(-) rename src/main/java/org/springframework/data/elasticsearch/core/query/{UpdateByQueryResponse.java => ByQueryResponse.java} (83%) diff --git a/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc b/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc index 87390069e..756b39ca1 100644 --- a/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc +++ b/src/main/asciidoc/reference/elasticsearch-migration-guide-4.1-4.2.adoc @@ -47,3 +47,9 @@ If a refresh policy is set, then it will be used by the repositories as well. When configuring Spring Data Elasticsearch like described in <> by using `ElasticsearchConfigurationSupport`, `AbstractElasticsearchConfiguration` or `AbstractReactiveElasticsearchConfiguration` the refresh policy will be initialized to `null`. Previously the reactive code initialized this to `IMMEDIATE`, now reactive and non-reactive code show the same behaviour. + +=== Method return types + +==== delete methods that take a Query + +The reactive methods previously returned a `Mono` with the number of deleted documents, the non reactive versions were void. They now return a `Mono` which contains much more detailed information about the deleted documents and errors that might have occurred. diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index a89100e89..149caf258 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -115,7 +115,7 @@ import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices; import org.springframework.data.elasticsearch.client.util.NamedXContents; import org.springframework.data.elasticsearch.client.util.ScrollState; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.util.Lazy; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; @@ -524,10 +524,10 @@ public Mono deleteBy(HttpHeaders headers, DeleteByQueryReq } @Override - public Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) { + public Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) { return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) // .next() // - .map(UpdateByQueryResponse::of); + .map(ByQueryResponse::of); } /* diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 270ab61af..f5716c187 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -66,7 +66,7 @@ import org.elasticsearch.search.suggest.Suggest; import org.springframework.data.elasticsearch.client.ClientConfiguration; import org.springframework.data.elasticsearch.client.ElasticsearchHost; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.http.HttpHeaders; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -606,7 +606,7 @@ default Mono deleteBy(DeleteByQueryRequest deleteRequest) * * Query API on elastic.co * @return a {@link Mono} emitting operation response. */ - default Mono updateBy(Consumer consumer) { + default Mono updateBy(Consumer consumer) { final UpdateByQueryRequest request = new UpdateByQueryRequest(); consumer.accept(request); @@ -621,7 +621,7 @@ default Mono updateBy(Consumer cons * * Query API on elastic.co * @return a {@link Mono} emitting operation response. */ - default Mono updateBy(UpdateByQueryRequest updateRequest) { + default Mono updateBy(UpdateByQueryRequest updateRequest) { return updateBy(HttpHeaders.EMPTY, updateRequest); } @@ -634,7 +634,7 @@ default Mono updateBy(UpdateByQueryRequest updateRequest) * * Query API on elastic.co * @return a {@link Mono} emitting operation response. */ - Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest); + Mono updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest); /** * Execute a {@link BulkRequest} against the {@literal bulk} API. diff --git a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java index 81a8ef594..c802c044a 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java @@ -53,6 +53,7 @@ import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery; @@ -275,8 +276,8 @@ public String delete(String id, Class entityType) { } @Override - public void delete(Query query, Class clazz) { - delete(query, clazz, getIndexCoordinatesFor(clazz)); + public ByQueryResponse delete(Query query, Class clazz) { + return delete(query, clazz, getIndexCoordinatesFor(clazz)); } @Override diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index 5eec5eeff..91412c611 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -19,9 +19,9 @@ import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.routing.RoutingResolver; @@ -276,19 +276,21 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * @param query query defining the objects * @param clazz The entity class, must be annotated with * {@link org.springframework.data.elasticsearch.annotations.Document} + * @return response with detailed information * @since 4.1 */ - void delete(Query query, Class clazz); + ByQueryResponse delete(Query query, Class clazz); /** * Delete all records matching the query. - * + * * @param query query defining the objects * @param clazz The entity class, must be annotated with * {@link org.springframework.data.elasticsearch.annotations.Document} * @param index the index from which to delete + * @return response with detailed information */ - void delete(Query query, Class clazz, IndexCoordinates index); + ByQueryResponse delete(Query query, Class clazz, IndexCoordinates index); /** * Partial update of the document. @@ -307,5 +309,5 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * @return the update response * @since 4.2 */ - UpdateByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 72b1adb05..2b2583f87 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -49,9 +49,9 @@ import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.support.SearchHitsUtil; @@ -208,9 +208,9 @@ protected String doDelete(String id, @Nullable String routing, IndexCoordinates } @Override - public void delete(Query query, Class clazz, IndexCoordinates index) { + public ByQueryResponse delete(Query query, Class clazz, IndexCoordinates index) { DeleteByQueryRequest deleteByQueryRequest = requestFactory.deleteByQueryRequest(query, clazz, index); - execute(client -> client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT)); + return ByQueryResponse.of(execute(client -> client.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT))); } @Override @@ -231,7 +231,7 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) { } @Override - public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { + public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); @@ -248,7 +248,7 @@ public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates i final BulkByScrollResponse bulkByScrollResponse = execute( client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT)); - return UpdateByQueryResponse.of(bulkByScrollResponse); + return ByQueryResponse.of(bulkByScrollResponse); } public List doBulkOperation(List queries, BulkOptions bulkOptions, diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 5727da50a..0123a5fac 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -47,9 +47,9 @@ import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.support.SearchHitsUtil; @@ -232,8 +232,8 @@ protected String doDelete(String id, @Nullable String routing, IndexCoordinates } @Override - public void delete(Query query, Class clazz, IndexCoordinates index) { - requestFactory.deleteByQueryRequestBuilder(client, query, clazz, index).get(); + public ByQueryResponse delete(Query query, Class clazz, IndexCoordinates index) { + return ByQueryResponse.of(requestFactory.deleteByQueryRequestBuilder(client, query, clazz, index).get()); } @Override @@ -260,7 +260,7 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) { } @Override - public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { + public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) { Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); @@ -275,7 +275,7 @@ public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates i // UpdateByQueryRequestBuilder has not parameters to set a routing value final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet(); - return UpdateByQueryResponse.of(bulkByScrollResponse); + return ByQueryResponse.of(bulkByScrollResponse); } public List doBulkOperation(List queries, BulkOptions bulkOptions, diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 3ddef607b..c3917e3e4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -24,8 +24,8 @@ import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.util.Assert; @@ -264,7 +264,7 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @param entityType must not be {@literal null}. * @return a {@link Mono} emitting the number of the removed documents. */ - Mono delete(Query query, Class entityType); + Mono delete(Query query, Class entityType); /** * Delete the documents matching the given {@link Query} extracting index from entity metadata. @@ -274,7 +274,7 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @param index the target index, must not be {@literal null} * @return a {@link Mono} emitting the number of the removed documents. */ - Mono delete(Query query, Class entityType, IndexCoordinates index); + Mono delete(Query query, Class entityType, IndexCoordinates index); /** * Partial update of the document. @@ -294,5 +294,5 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @return a {@link Mono} emitting the update response * @since 4.2 */ - Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 6734c54b0..7d2cf7b18 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -74,7 +74,7 @@ import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.UpdateQuery; import org.springframework.data.elasticsearch.core.query.UpdateResponse; import org.springframework.data.elasticsearch.core.routing.DefaultRoutingResolver; @@ -526,11 +526,11 @@ private Mono doDeleteById(String id, @Nullable String routing, IndexCoor * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#delete(Query, Class, IndexCoordinates) */ @Override - public Mono delete(Query query, Class entityType, IndexCoordinates index) { + public Mono delete(Query query, Class entityType, IndexCoordinates index) { Assert.notNull(query, "Query must not be null!"); - return doDeleteBy(query, entityType, index).map(BulkByScrollResponse::getDeleted).next(); + return doDeleteBy(query, entityType, index).map(ByQueryResponse::of); } @Override @@ -556,7 +556,7 @@ public Mono update(UpdateQuery updateQuery, IndexCoordinates ind } @Override - public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { + public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { Assert.notNull(updateQuery, "updateQuery must not be null"); Assert.notNull(index, "Index must not be null"); @@ -578,13 +578,13 @@ public Mono updateByQuery(UpdateQuery updateQuery, IndexC } @Override - public Mono delete(Query query, Class entityType) { + public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); } - private Flux doDeleteBy(Query query, Class entityType, IndexCoordinates index) { + private Mono doDeleteBy(Query query, Class entityType, IndexCoordinates index) { - return Flux.defer(() -> { + return Mono.defer(() -> { DeleteByQueryRequest request = requestFactory.deleteByQueryRequest(query, entityType, index); return doDeleteBy(prepareDeleteByRequest(request)); }); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/query/ByQueryResponse.java similarity index 83% rename from src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java rename to src/main/java/org/springframework/data/elasticsearch/core/query/ByQueryResponse.java index ac7b471cc..232c4313c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/UpdateByQueryResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/ByQueryResponse.java @@ -29,7 +29,7 @@ * @author Farid Faoudi * @since 4.2 */ -public class UpdateByQueryResponse { +public class ByQueryResponse { private final long took; private final boolean timedOut; @@ -44,7 +44,7 @@ public class UpdateByQueryResponse { @Nullable private final String reasonCancelled; private final List failures; - private UpdateByQueryResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, + private ByQueryResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, long versionConflicts, long noops, long bulkRetries, long searchRetries, @Nullable String reasonCancelled, List failures) { this.took = took; @@ -149,21 +149,21 @@ public List getFailures() { } /** - * Create a new {@link UpdateByQueryResponseBuilder} to build {@link UpdateByQueryResponse} + * Create a new {@link ByQueryResponseBuilder} to build {@link ByQueryResponse} * - * @return a new {@link UpdateByQueryResponseBuilder} to build {@link UpdateByQueryResponse} + * @return a new {@link ByQueryResponseBuilder} to build {@link ByQueryResponse} */ - public static UpdateByQueryResponseBuilder builder() { - return new UpdateByQueryResponseBuilder(); + public static ByQueryResponseBuilder builder() { + return new ByQueryResponseBuilder(); } - public static UpdateByQueryResponse of(BulkByScrollResponse bulkByScrollResponse) { + public static ByQueryResponse of(BulkByScrollResponse bulkByScrollResponse) { final List failures = bulkByScrollResponse.getBulkFailures() // .stream() // .map(Failure::of) // .collect(Collectors.toList()); // - return UpdateByQueryResponse.builder() // + return ByQueryResponse.builder() // .withTook(bulkByScrollResponse.getTook().getMillis()) // .withTimedOut(bulkByScrollResponse.isTimedOut()) // .withTotal(bulkByScrollResponse.getTotal()) // @@ -331,7 +331,7 @@ public Failure build() { } } - public static final class UpdateByQueryResponseBuilder { + public static final class ByQueryResponseBuilder { private long took; private boolean timedOut; private long total; @@ -345,71 +345,71 @@ public static final class UpdateByQueryResponseBuilder { @Nullable private String reasonCancelled; private List failures = Collections.emptyList(); - private UpdateByQueryResponseBuilder() {} + private ByQueryResponseBuilder() {} - public UpdateByQueryResponseBuilder withTook(long took) { + public ByQueryResponseBuilder withTook(long took) { this.took = took; return this; } - public UpdateByQueryResponseBuilder withTimedOut(boolean timedOut) { + public ByQueryResponseBuilder withTimedOut(boolean timedOut) { this.timedOut = timedOut; return this; } - public UpdateByQueryResponseBuilder withTotal(long total) { + public ByQueryResponseBuilder withTotal(long total) { this.total = total; return this; } - public UpdateByQueryResponseBuilder withUpdated(long updated) { + public ByQueryResponseBuilder withUpdated(long updated) { this.updated = updated; return this; } - public UpdateByQueryResponseBuilder withDeleted(long deleted) { + public ByQueryResponseBuilder withDeleted(long deleted) { this.deleted = deleted; return this; } - public UpdateByQueryResponseBuilder withBatches(int batches) { + public ByQueryResponseBuilder withBatches(int batches) { this.batches = batches; return this; } - public UpdateByQueryResponseBuilder withVersionConflicts(long versionConflicts) { + public ByQueryResponseBuilder withVersionConflicts(long versionConflicts) { this.versionConflicts = versionConflicts; return this; } - public UpdateByQueryResponseBuilder withNoops(long noops) { + public ByQueryResponseBuilder withNoops(long noops) { this.noops = noops; return this; } - public UpdateByQueryResponseBuilder withBulkRetries(long bulkRetries) { + public ByQueryResponseBuilder withBulkRetries(long bulkRetries) { this.bulkRetries = bulkRetries; return this; } - public UpdateByQueryResponseBuilder withSearchRetries(long searchRetries) { + public ByQueryResponseBuilder withSearchRetries(long searchRetries) { this.searchRetries = searchRetries; return this; } - public UpdateByQueryResponseBuilder withReasonCancelled(String reasonCancelled) { + public ByQueryResponseBuilder withReasonCancelled(String reasonCancelled) { this.reasonCancelled = reasonCancelled; return this; } - public UpdateByQueryResponseBuilder withFailures(List failures) { + public ByQueryResponseBuilder withFailures(List failures) { this.failures = failures; return this; } - public UpdateByQueryResponse build() { - return new UpdateByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, - bulkRetries, searchRetries, reasonCancelled, failures); + public ByQueryResponse build() { + return new ByQueryResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, reasonCancelled, failures); } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java b/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java index ed6cd8866..a6c56f70b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/repository/query/AbstractReactiveElasticsearchRepositoryQuery.java @@ -25,6 +25,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryExecution.ResultProcessingConverter; import org.springframework.data.elasticsearch.repository.query.ReactiveElasticsearchQueryExecution.ResultProcessingExecution; @@ -115,7 +116,8 @@ private ReactiveElasticsearchQueryExecution getExecutionToWrap(ElasticsearchPara ReactiveElasticsearchOperations operations) { if (isDeleteQuery()) { - return (query, type, targetType, indexCoordinates) -> operations.delete(query, type, indexCoordinates); + return (query, type, targetType, indexCoordinates) -> operations.delete(query, type, indexCoordinates) + .map(ByQueryResponse::getDeleted); } else if (isCountQuery()) { return (query, type, targetType, indexCoordinates) -> operations.count(query, type, indexCoordinates); } else if (isExistsQuery()) { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java index b6eb439bc..0aa6d93fc 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClientIntegrationTests.java @@ -67,7 +67,7 @@ import org.springframework.data.elasticsearch.core.ReactiveIndexOperations; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; import org.springframework.http.HttpHeaders; @@ -478,7 +478,7 @@ void updateByEmitResultWhenNothingUpdated() { .setScript(new Script(ScriptType.INLINE, "painless", script, params)); // client.updateBy(request) // - .map(UpdateByQueryResponse::getUpdated) // + .map(ByQueryResponse::getUpdated) // .as(StepVerifier::create) // .expectNext(2L) // .verifyComplete(); // @@ -510,7 +510,7 @@ void updateByShouldUpdateExistingDocument() { .setScript(new Script(ScriptType.INLINE, "painless", script, params)); // client.updateBy(request) // - .map(UpdateByQueryResponse::getUpdated) // + .map(ByQueryResponse::getUpdated) // .as(StepVerifier::create) // .expectNext(0L) // .verifyComplete(); // diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index d713adf9c..ff50200ab 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -630,7 +630,9 @@ public void deleteByQueryShouldReturnZeroWhenIndexDoesNotExist() { template.delete(query, SampleEntity.class) // .as(StepVerifier::create) // - .expectNext(0L) // + .consumeNextWith(byQueryResponse -> { + assertThat(byQueryResponse.getDeleted()).isEqualTo(0L); + }) .verifyComplete(); } @@ -654,6 +656,7 @@ public void shouldDeleteAcrossIndex() { .build(); template.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) // + .map(ByQueryResponse::getDeleted) // .as(StepVerifier::create) // .expectNext(2L) // .verifyComplete(); @@ -681,6 +684,7 @@ public void shouldDeleteAcrossIndexWhenNoMatchingDataPresent() { .build(); template.delete(searchQuery, SampleEntity.class, IndexCoordinates.of(indexPrefix + '*')) // + .map(ByQueryResponse::getDeleted) // .as(StepVerifier::create) // .expectNext(0L) // .verifyComplete(); @@ -696,6 +700,7 @@ public void deleteByQueryShouldReturnNumberOfDeletedDocuments() { CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("test")); template.delete(query, SampleEntity.class) // + .map(ByQueryResponse::getDeleted) // .as(StepVerifier::create) // .expectNext(2L) // .verifyComplete(); @@ -709,6 +714,7 @@ public void deleteByQueryShouldReturnZeroIfNothingDeleted() { CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("luke")); template.delete(query, SampleEntity.class) // + .map(ByQueryResponse::getDeleted) // .as(StepVerifier::create) // .expectNext(0L) // .verifyComplete(); diff --git a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java index 146633f6f..b71e4cbd0 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryTests.java @@ -23,6 +23,7 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier;