From b258302ce59fbaf657d705827ca444339a307dcb Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sun, 28 Nov 2021 19:17:41 +0100 Subject: [PATCH] Add reactive SearchHits implementation --- .../reference/elasticsearch-operations.adoc | 3 + .../ReactiveElasticsearchTemplate.java | 39 +++++++-- .../document/SearchDocumentResponse.java | 12 +-- .../core/ReactiveSearchHitSupport.java | 33 ++++++++ .../core/ReactiveSearchHits.java | 75 +++++++++++++++++ .../core/ReactiveSearchHitsImpl.java | 81 +++++++++++++++++++ .../core/ReactiveSearchOperations.java | 60 +++++++++++++- ...ElasticsearchTemplateIntegrationTests.java | 53 +++++++++--- 8 files changed, 332 insertions(+), 24 deletions(-) create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitSupport.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java diff --git a/src/main/asciidoc/reference/elasticsearch-operations.adoc b/src/main/asciidoc/reference/elasticsearch-operations.adoc index e5bfc4708..6b3b33fd3 100644 --- a/src/main/asciidoc/reference/elasticsearch-operations.adoc +++ b/src/main/asciidoc/reference/elasticsearch-operations.adoc @@ -144,6 +144,9 @@ Returned by the low level scroll API functions in `ElasticsearchRestTemplate`, i .SearchHitsIterator An Iterator returned by the streaming functions of the `SearchOperations` interface. +.ReactiveSearchHits +`ReactiveSearchOperations` has methods returning a `Mono>`, this contains the same information as a `SearchHits` object, but will provide the contained `SearchHit` objects as a `Flux>` and not as a list. + [[elasticsearch.operations.queries]] == Queries diff --git a/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/ReactiveElasticsearchTemplate.java index dc297f21e..35e76a51b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/ReactiveElasticsearchTemplate.java @@ -754,6 +754,31 @@ public Mono> searchForPage(Query query, Class entityType, C .map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable())); } + @Override + public Mono> searchForHits(Query query, Class entityType, Class resultType) { + return searchForHits(query, entityType, resultType, getIndexCoordinatesFor(entityType)); + } + + @Override + public Mono> searchForHits(Query query, Class entityType, Class resultType, + IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + Assert.notNull(entityType, "entityType must not be null"); + Assert.notNull(resultType, "resultType must not be null"); + Assert.notNull(index, "index must not be null"); + + SearchDocumentCallback callback = new ReadSearchDocumentCallback<>(resultType, index); + + return doFindForResponse(query, entityType, index) // + .flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) // + .flatMap(callback::toEntity) // + .collectList() // + .map(entities -> SearchHitMapping.mappingFor(resultType, converter) // + .mapHits(searchDocumentResponse, entities))) // + .map(ReactiveSearchHitSupport::searchHitsFor); + } + private Flux doFind(Query query, Class clazz, IndexCoordinates index) { return Flux.defer(() -> { @@ -777,8 +802,9 @@ private Mono doFindForResponse(Query query, Class cla request = prepareSearchRequest(request, false); SearchDocumentCallback documentCallback = new ReadSearchDocumentCallback<>(clazz, index); - - return doFindForResponse(request, searchDocument -> documentCallback.toEntity(searchDocument).block()); + Function entityCreator = searchDocument -> documentCallback.toEntity(searchDocument) + .block(); + return doFindForResponse(request, entityCreator); }); } @@ -895,19 +921,18 @@ protected Flux doFind(SearchRequest request) { * Customization hook on the actual execution result {@link Mono}.
* * @param request the already prepared {@link SearchRequest} ready to be executed. - * @param suggestEntityCreator + * @param entityCreator * @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}. */ protected Mono doFindForResponse(SearchRequest request, - Function suggestEntityCreator) { + Function entityCreator) { if (QUERY_LOGGER.isDebugEnabled()) { QUERY_LOGGER.debug(String.format("Executing doFindForResponse: %s", request)); } - return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(searchResponse -> { - return SearchDocumentResponse.from(searchResponse, suggestEntityCreator); - }); + return Mono.from(execute(client -> client.searchForResponse(request))) + .map(searchResponse -> SearchDocumentResponse.from(searchResponse, entityCreator)); } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/document/SearchDocumentResponse.java b/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/document/SearchDocumentResponse.java index 69b55dc9e..1da9a560d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/document/SearchDocumentResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/backend/elasticsearch7/document/SearchDocumentResponse.java @@ -99,12 +99,12 @@ public Suggest getSuggest() { * creates a SearchDocumentResponse from the {@link SearchResponse} * * @param searchResponse must not be {@literal null} - * @param suggestEntityCreator function to create an entity from a {@link SearchDocument} + * @param entityCreator function to create an entity from a {@link SearchDocument} * @param entity type * @return the SearchDocumentResponse */ public static SearchDocumentResponse from(SearchResponse searchResponse, - Function suggestEntityCreator) { + Function entityCreator) { Assert.notNull(searchResponse, "searchResponse must not be null"); @@ -113,7 +113,7 @@ public static SearchDocumentResponse from(SearchResponse searchResponse, Aggregations aggregations = searchResponse.getAggregations(); org.elasticsearch.search.suggest.Suggest suggest = searchResponse.getSuggest(); - return from(searchHits, scrollId, aggregations, suggest, suggestEntityCreator); + return from(searchHits, scrollId, aggregations, suggest, entityCreator); } /** @@ -123,14 +123,14 @@ public static SearchDocumentResponse from(SearchResponse searchResponse, * @param scrollId scrollId * @param aggregations aggregations * @param suggestES the suggestion response from Elasticsearch - * @param suggestEntityCreator function to create an entity from a {@link SearchDocument} + * @param entityCreator function to create an entity from a {@link SearchDocument} * @param entity type * @return the {@link SearchDocumentResponse} * @since 4.3 */ public static SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId, @Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES, - Function suggestEntityCreator) { + Function entityCreator) { TotalHits responseTotalHits = searchHits.getTotalHits(); @@ -154,7 +154,7 @@ public static SearchDocumentResponse from(SearchHits searchHits, @Nullable S } } - Suggest suggest = suggestFrom(suggestES, suggestEntityCreator); + Suggest suggest = suggestFrom(suggestES, entityCreator); return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations, suggest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitSupport.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitSupport.java new file mode 100644 index 000000000..57b568dba --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitSupport.java @@ -0,0 +1,33 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import org.springframework.util.Assert; + +/** + * @author Peter-Josef Meisch + * @since 4.4 + */ +public final class ReactiveSearchHitSupport { + private ReactiveSearchHitSupport() {} + + public static ReactiveSearchHits searchHitsFor(SearchHits searchHits) { + + Assert.notNull(searchHits, "searchHits must not be null"); + + return new ReactiveSearchHitsImpl<>(searchHits); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java new file mode 100644 index 000000000..1e69e7d49 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHits.java @@ -0,0 +1,75 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import reactor.core.publisher.Flux; + +import org.springframework.data.elasticsearch.core.suggest.response.Suggest; +import org.springframework.lang.Nullable; + +/** + * Encapsulates a Flux of {@link SearchHit}s with additional information from the search. + * + * @param the result data class. + * @author Peter-Josef Meisch + * @since 4.4 + */ +public interface ReactiveSearchHits { + + /** + * @return the aggregations. + */ + @Nullable + AggregationsContainer getAggregations(); + + float getMaxScore(); + + /** + * @return the {@link SearchHit}s from the search result. + */ + Flux> getSearchHits(); + + /** + * @return the number of total hits. + */ + long getTotalHits(); + + /** + * @return the relation for the total hits + */ + TotalHitsRelation getTotalHitsRelation(); + + /** + * @return true if aggregations are available + */ + boolean hasAggregations(); + + /** + * @return whether the {@link SearchHits} has search hits. + */ + boolean hasSearchHits(); + + /** + * @return the suggest response + */ + @Nullable + Suggest getSuggest(); + + /** + * @return wether the {@link SearchHits} has a suggest response. + */ + boolean hasSuggest(); +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java new file mode 100644 index 000000000..0eea0d7de --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchHitsImpl.java @@ -0,0 +1,81 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core; + +import reactor.core.publisher.Flux; + +import org.springframework.data.elasticsearch.core.suggest.response.Suggest; +import org.springframework.lang.Nullable; + +/** + * @author Peter-Josef Meisch + * @since 4.4 + */ +public class ReactiveSearchHitsImpl implements ReactiveSearchHits { + + protected final SearchHits delegate; + + public ReactiveSearchHitsImpl(SearchHits delegate) { + this.delegate = delegate; + } + + @Override + public long getTotalHits() { + return delegate.getTotalHits(); + } + + @Override + public TotalHitsRelation getTotalHitsRelation() { + return delegate.getTotalHitsRelation(); + } + + @Override + public boolean hasAggregations() { + return delegate.hasAggregations(); + } + + @Override + @Nullable + public AggregationsContainer getAggregations() { + return delegate.getAggregations(); + } + + @Override + public float getMaxScore() { + return delegate.getMaxScore(); + } + + @Override + public boolean hasSearchHits() { + return delegate.hasSearchHits(); + } + + @Override + public Flux> getSearchHits() { + return Flux.defer(() -> Flux.fromIterable(delegate.getSearchHits())); + } + + @Override + @Nullable + public Suggest getSuggest() { + return delegate.getSuggest(); + } + + @Override + public boolean hasSuggest() { + return delegate.hasSuggest(); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java index 50afff454..9b3ac6029 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveSearchOperations.java @@ -15,7 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import org.springframework.data.elasticsearch.backend.elasticsearch7.query.NativeSearchQuery; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -23,6 +22,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder; import org.springframework.data.domain.Pageable; +import org.springframework.data.elasticsearch.backend.elasticsearch7.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.suggest.response.Suggest; @@ -171,12 +171,68 @@ default Mono> searchForPage(Query query, Class entityType, * @param entityType must not be {@literal null}. * @param resultType the projection result type. * @param index the target index, must not be {@literal null} - * @param * @return a {@link Mono} emitting matching entities in a {@link SearchHits}. * @since 4.1 */ Mono> searchForPage(Query query, Class entityType, Class resultType, IndexCoordinates index); + /** + * Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and + * which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method. + * + * @param the result type class + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information + * @since 4.4 + */ + default Mono> searchForHits(Query query, Class entityType) { + return searchForHits(query, entityType, entityType); + } + + /** + * Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and + * which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method. + * + * @param the result type class + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param resultType the projection result type. + * @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information + * @since 4.4 + */ + Mono> searchForHits(Query query, Class entityType, Class resultType); + + /** + * Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and + * which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method. + * + * @param the result type class + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param index the target index, must not be {@literal null} + * @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information + * @since 4.4 + */ + default Mono> searchForHits(Query query, Class entityType, IndexCoordinates index) { + return searchForHits(query, entityType, entityType, index); + } + + /** + * Perform a search and return the {@link ReactiveSearchHits} which contains information about the search results and + * which will provide the documents by the {@link ReactiveSearchHits#getSearchHits()} method. + * + * @param the result type class + * @param query must not be {@literal null}. + * @param entityType must not be {@literal null}. + * @param resultType the projection result type. + * @param index the target index, must not be {@literal null} + * @return a {@link Mono} emitting the {@link ReactiveSearchHits} that contains the search result information + * @since 4.4 + */ + Mono> searchForHits(Query query, Class entityType, Class resultType, + IndexCoordinates index); + /** * Perform an aggregation specified by the given {@link Query query}.
* diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index 466c4085c..568495e15 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -110,6 +110,7 @@ * @author Roman Puchkovskiy * @author George Popides */ +@SuppressWarnings("SpringJavaAutowiredMembersInspection") @SpringIntegrationTest public class ReactiveElasticsearchTemplateIntegrationTests { @@ -1175,6 +1176,31 @@ void shouldWorkWithImmutableClasses() { assertThat(retrieved).isEqualTo(savedEntity.get()); }).verifyComplete(); } + + @Test // #2015 + @DisplayName("should return Mono of ReactiveSearchHits") + void shouldReturnMonoOfReactiveSearchHits() { + List entities = new ArrayList<>(); + for (int i = 1; i <= 20; i++) { + entities.add(randomEntity("message " + i)); + } + + Query query = Query.findAll().setPageable(PageRequest.of(0, 7)); + + operations.saveAll(Mono.just(entities), SampleEntity.class).then().block(); + + Mono> searchHitsMono = operations.searchForHits(query, SampleEntity.class); + + searchHitsMono.as(StepVerifier::create) // + .consumeNextWith(reactiveSearchHits -> { + assertThat(reactiveSearchHits.getTotalHits()).isEqualTo(20); + reactiveSearchHits.getSearchHits().as(StepVerifier::create) // + .expectNextCount(7) // + .verifyComplete(); // + }) // + .verifyComplete(); + } + // endregion // region Helper functions @@ -1249,10 +1275,13 @@ public int hashCode() { @Document(indexName = "#{@indexNameProvider.indexName()}") static class SampleEntity { - @Nullable @Id private String id; - @Nullable @Field(type = Text, store = true, fielddata = true) private String message; + @Nullable + @Id private String id; + @Nullable + @Field(type = Text, store = true, fielddata = true) private String message; @Nullable private int rate; - @Nullable @Version private Long version; + @Nullable + @Version private Long version; @Nullable public String getId() { @@ -1324,7 +1353,8 @@ public int hashCode() { @Document(indexName = "#{@indexNameProvider.indexName()}") static class OptimisticEntity { - @Nullable @Id private String id; + @Nullable + @Id private String id; @Nullable private String message; @Nullable private SeqNoPrimaryTerm seqNoPrimaryTerm; @@ -1358,10 +1388,12 @@ public void setSeqNoPrimaryTerm(@Nullable SeqNoPrimaryTerm seqNoPrimaryTerm) { @Document(indexName = "#{@indexNameProvider.indexName()}") static class OptimisticAndVersionedEntity { - @Nullable @Id private String id; + @Nullable + @Id private String id; @Nullable private String message; @Nullable private SeqNoPrimaryTerm seqNoPrimaryTerm; - @Nullable @Version private Long version; + @Nullable + @Version private Long version; @Nullable public String getId() { @@ -1402,8 +1434,10 @@ public void setVersion(@Nullable java.lang.Long version) { @Document(indexName = "#{@indexNameProvider.indexName()}") static class VersionedEntity { - @Nullable @Id private String id; - @Nullable @Version private Long version; + @Nullable + @Id private String id; + @Nullable + @Version private Long version; @Nullable public String getId() { @@ -1428,7 +1462,8 @@ public void setVersion(@Nullable java.lang.Long version) { @Setting(settingPath = "settings/test-settings.json") @Mapping(mappingPath = "mappings/test-mappings.json") private static class EntityWithSettingsAndMappingsReactive { - @Nullable @Id String id; + @Nullable + @Id String id; @Nullable public String getId() {