Skip to content

Add reactive SearchHits. #2017

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/asciidoc/reference/elasticsearch-operations.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ Returned by the low level scroll API functions in `ElasticsearchRestTemplate`, i
.SearchHitsIterator<T>
An Iterator returned by the streaming functions of the `SearchOperations` interface.

.ReactiveSearchHits
`ReactiveSearchOperations` has methods returning a `Mono<ReactiveSearchHits<T>>`, this contains the same information as a `SearchHits<T>` object, but will provide the contained `SearchHit<T>` objects as a `Flux<SearchHit<T>>` and not as a list.

[[elasticsearch.operations.queries]]
== Queries

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,31 @@ public <T> Mono<SearchPage<T>> searchForPage(Query query, Class<?> entityType, C
.map(searchHits -> SearchHitSupport.searchPageFor(searchHits, query.getPageable()));
}

@Override
public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType) {
return searchForHits(query, entityType, resultType, getIndexCoordinatesFor(entityType));
}

@Override
public <T> Mono<ReactiveSearchHits<T>> searchForHits(Query query, Class<?> entityType, Class<T> resultType,
IndexCoordinates index) {

Assert.notNull(query, "query must not be null");
Assert.notNull(entityType, "entityType must not be null");
Assert.notNull(resultType, "resultType must not be null");
Assert.notNull(index, "index must not be null");

SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>(resultType, index);

return doFindForResponse(query, entityType, index) //
.flatMap(searchDocumentResponse -> Flux.fromIterable(searchDocumentResponse.getSearchDocuments()) //
.flatMap(callback::toEntity) //
.collectList() //
.map(entities -> SearchHitMapping.mappingFor(resultType, converter) //
.mapHits(searchDocumentResponse, entities))) //
.map(ReactiveSearchHitSupport::searchHitsFor);
}

private Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {

return Flux.defer(() -> {
Expand All @@ -777,8 +802,9 @@ private Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<?> cla
request = prepareSearchRequest(request, false);

SearchDocumentCallback<?> documentCallback = new ReadSearchDocumentCallback<>(clazz, index);

return doFindForResponse(request, searchDocument -> documentCallback.toEntity(searchDocument).block());
Function<SearchDocument, Object> entityCreator = searchDocument -> documentCallback.toEntity(searchDocument)
.block();
return doFindForResponse(request, entityCreator);
});
}

Expand Down Expand Up @@ -895,19 +921,18 @@ protected Flux<SearchDocument> doFind(SearchRequest request) {
* Customization hook on the actual execution result {@link Mono}. <br />
*
* @param request the already prepared {@link SearchRequest} ready to be executed.
* @param suggestEntityCreator
* @param entityCreator
* @return a {@link Mono} emitting the result of the operation converted to s {@link SearchDocumentResponse}.
*/
protected Mono<SearchDocumentResponse> doFindForResponse(SearchRequest request,
Function<SearchDocument, ? extends Object> suggestEntityCreator) {
Function<SearchDocument, ? extends Object> entityCreator) {

if (QUERY_LOGGER.isDebugEnabled()) {
QUERY_LOGGER.debug(String.format("Executing doFindForResponse: %s", request));
}

return Mono.from(execute(client1 -> client1.searchForResponse(request))).map(searchResponse -> {
return SearchDocumentResponse.from(searchResponse, suggestEntityCreator);
});
return Mono.from(execute(client -> client.searchForResponse(request)))
.map(searchResponse -> SearchDocumentResponse.from(searchResponse, entityCreator));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,12 @@ public Suggest getSuggest() {
* creates a SearchDocumentResponse from the {@link SearchResponse}
*
* @param searchResponse must not be {@literal null}
* @param suggestEntityCreator function to create an entity from a {@link SearchDocument}
* @param entityCreator function to create an entity from a {@link SearchDocument}
* @param <T> entity type
* @return the SearchDocumentResponse
*/
public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
Function<SearchDocument, T> suggestEntityCreator) {
Function<SearchDocument, T> entityCreator) {

Assert.notNull(searchResponse, "searchResponse must not be null");

Expand All @@ -113,7 +113,7 @@ public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
Aggregations aggregations = searchResponse.getAggregations();
org.elasticsearch.search.suggest.Suggest suggest = searchResponse.getSuggest();

return from(searchHits, scrollId, aggregations, suggest, suggestEntityCreator);
return from(searchHits, scrollId, aggregations, suggest, entityCreator);
}

/**
Expand All @@ -123,14 +123,14 @@ public static <T> SearchDocumentResponse from(SearchResponse searchResponse,
* @param scrollId scrollId
* @param aggregations aggregations
* @param suggestES the suggestion response from Elasticsearch
* @param suggestEntityCreator function to create an entity from a {@link SearchDocument}
* @param entityCreator function to create an entity from a {@link SearchDocument}
* @param <T> entity type
* @return the {@link SearchDocumentResponse}
* @since 4.3
*/
public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable String scrollId,
@Nullable Aggregations aggregations, @Nullable org.elasticsearch.search.suggest.Suggest suggestES,
Function<SearchDocument, T> suggestEntityCreator) {
Function<SearchDocument, T> entityCreator) {

TotalHits responseTotalHits = searchHits.getTotalHits();

Expand All @@ -154,7 +154,7 @@ public static <T> SearchDocumentResponse from(SearchHits searchHits, @Nullable S
}
}

Suggest suggest = suggestFrom(suggestES, suggestEntityCreator);
Suggest suggest = suggestFrom(suggestES, entityCreator);
return new SearchDocumentResponse(totalHits, totalHitsRelation, maxScore, scrollId, searchDocuments, aggregations,
suggest);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;

import org.springframework.util.Assert;

/**
* @author Peter-Josef Meisch
* @since 4.4
*/
public final class ReactiveSearchHitSupport {
private ReactiveSearchHitSupport() {}

public static <T> ReactiveSearchHits<T> searchHitsFor(SearchHits<T> searchHits) {

Assert.notNull(searchHits, "searchHits must not be null");

return new ReactiveSearchHitsImpl<>(searchHits);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;

import reactor.core.publisher.Flux;

import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.lang.Nullable;

/**
* Encapsulates a Flux of {@link SearchHit}s with additional information from the search.
*
* @param <T> the result data class.
* @author Peter-Josef Meisch
* @since 4.4
*/
public interface ReactiveSearchHits<T> {

/**
* @return the aggregations.
*/
@Nullable
AggregationsContainer<?> getAggregations();

float getMaxScore();

/**
* @return the {@link SearchHit}s from the search result.
*/
Flux<SearchHit<T>> getSearchHits();

/**
* @return the number of total hits.
*/
long getTotalHits();

/**
* @return the relation for the total hits
*/
TotalHitsRelation getTotalHitsRelation();

/**
* @return true if aggregations are available
*/
boolean hasAggregations();

/**
* @return whether the {@link SearchHits} has search hits.
*/
boolean hasSearchHits();

/**
* @return the suggest response
*/
@Nullable
Suggest getSuggest();

/**
* @return wether the {@link SearchHits} has a suggest response.
*/
boolean hasSuggest();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core;

import reactor.core.publisher.Flux;

import org.springframework.data.elasticsearch.core.suggest.response.Suggest;
import org.springframework.lang.Nullable;

/**
* @author Peter-Josef Meisch
* @since 4.4
*/
public class ReactiveSearchHitsImpl<T> implements ReactiveSearchHits<T> {

protected final SearchHits<T> delegate;

public ReactiveSearchHitsImpl(SearchHits<T> delegate) {
this.delegate = delegate;
}

@Override
public long getTotalHits() {
return delegate.getTotalHits();
}

@Override
public TotalHitsRelation getTotalHitsRelation() {
return delegate.getTotalHitsRelation();
}

@Override
public boolean hasAggregations() {
return delegate.hasAggregations();
}

@Override
@Nullable
public AggregationsContainer<?> getAggregations() {
return delegate.getAggregations();
}

@Override
public float getMaxScore() {
return delegate.getMaxScore();
}

@Override
public boolean hasSearchHits() {
return delegate.hasSearchHits();
}

@Override
public Flux<SearchHit<T>> getSearchHits() {
return Flux.defer(() -> Flux.fromIterable(delegate.getSearchHits()));
}

@Override
@Nullable
public Suggest getSuggest() {
return delegate.getSuggest();
}

@Override
public boolean hasSuggest() {
return delegate.hasSuggest();
}
}
Loading