diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index b4c511b24..4c4891321 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -47,7 +47,6 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionRequest; -import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; @@ -79,6 +78,8 @@ import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Request; +import org.elasticsearch.client.core.CountRequest; +import org.elasticsearch.client.core.CountResponse; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -328,6 +329,17 @@ public Mono delete(HttpHeaders headers, DeleteRequest deleteRequ .publishNext(); } + /* + * (non-Javadoc) + * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) + */ + @Override + public Mono count(HttpHeaders headers, CountRequest countRequest) { + return sendRequest(countRequest, RequestCreator.count(), CountResponse.class, headers) // + .map(CountResponse::getCount) // + .next(); + } + /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) @@ -572,13 +584,12 @@ private static GetResult getResponseToGetResult(GetResponse response) { // --> - private Flux sendRequest(Req request, - Function converter, Class responseType, HttpHeaders headers) { + private Flux sendRequest(Req request, Function converter, + Class responseType, HttpHeaders headers) { return sendRequest(converter.apply(request), responseType, headers); } - private Flux sendRequest(Request request, Class responseType, - HttpHeaders headers) { + private Flux sendRequest(Request request, Class responseType, HttpHeaders headers) { String logId = ClientLogger.newLogId(); @@ -807,6 +818,10 @@ static Function flushIndex() { return RequestConverters::flushIndex; } + static Function count() { + return RequestConverters::count; + } + } /** diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 474845771..d7ccc4049 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; @@ -59,6 +60,7 @@ * * @author Christoph Strobl * @author Mark Paluch + * @author Peter-Josef Meisch * @author Henrique Amaral * @since 3.2 * @see ClientConfiguration @@ -331,6 +333,47 @@ default Mono delete(DeleteRequest deleteRequest) { */ Mono delete(HttpHeaders headers, DeleteRequest deleteRequest); + /** + * Execute a {@link SearchRequest} against the {@literal count} API. + * + * @param consumer new {@literal null}. + * @see Count API on + * elastic.co + * @return the {@link Mono} emitting the count result. + * @since 4.0 + */ + default Mono count(Consumer consumer) { + + CountRequest countRequest = new CountRequest(); + consumer.accept(countRequest); + return count(countRequest); + } + + /** + * Execute a {@link SearchRequest} against the {@literal count} API. + * + * @param countRequest must not be {@literal null}. + * @see Count API on + * elastic.co + * @return the {@link Mono} emitting the count result. + * @since 4.0 + */ + default Mono count(CountRequest countRequest) { + return count(HttpHeaders.EMPTY, countRequest); + } + + /** + * Execute a {@link SearchRequest} against the {@literal count} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param countRequest must not be {@literal null}. + * @see Count API on + * elastic.co + * @return the {@link Mono} emitting the count result. + * @since 4.0 + */ + Mono count(HttpHeaders headers, CountRequest countRequest); + /** * Execute a {@link SearchRequest} against the {@literal search} API. * diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 47d1f26df..a11ebf830 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -59,6 +59,7 @@ import org.elasticsearch.client.Request; import org.elasticsearch.client.Requests; import org.elasticsearch.client.RethrottleRequest; +import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.client.indices.AnalyzeRequest; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.Priority; @@ -388,6 +389,32 @@ public static Request search(SearchRequest searchRequest) { return request; } + /** + * Creates a count request. + * + * @param countRequest the search defining the data to be counted + * @return Elasticsearch count request + * @since 4.0 + */ + public static Request count(CountRequest countRequest) { + Request request = new Request(HttpMethod.POST.name(), + endpoint(countRequest.indices(), countRequest.types(), "_count")); + + Params params = new Params(request); + addCountRequestParams(params, countRequest); + + if (countRequest.source() != null) { + request.setEntity(createEntity(countRequest.source(), REQUEST_BODY_CONTENT_TYPE)); + } + return request; + } + + private static void addCountRequestParams(Params params, CountRequest countRequest) { + params.withRouting(countRequest.routing()); + params.withPreference(countRequest.preference()); + params.withIndicesOptions(countRequest.indicesOptions()); + } + private static void addSearchRequestParams(Params params, SearchRequest searchRequest) { params.putParam("typed_keys", "true"); params.withRouting(searchRequest.routing()); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index cb9dcdcc2..9f69c08a3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -35,6 +35,7 @@ import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.client.Requests; +import org.elasticsearch.client.core.CountRequest; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -228,85 +229,138 @@ private Flux doFind(Query query, ElasticsearchPersistentEntity ent @Nullable String type) { return Flux.defer(() -> { + SearchRequest request = prepareSearchRequest(buildSearchRequest(query, entity, index, type)); - IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); - SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName)); - request.types(indexTypes(query, indexCoordinates::getTypeName)); + if (query.getPageable().isPaged() || query.isLimiting()) { + return doFind(request); + } else { + return doScroll(request); + } + }); + } - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(mappedQuery(query, entity)); - searchSourceBuilder.version(entity.hasVersionProperty()); - searchSourceBuilder.trackScores(query.getTrackScores()); + @Override + public Mono count(Query query, Class entityType, String index, String type) { + return doCount(query, getPersistentEntity(entityType), index, type); + } - QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); - if (postFilterQuery != null) { - searchSourceBuilder.postFilter(postFilterQuery); - } + private Mono doCount(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, + @Nullable String type) { + return Mono.defer(() -> { - if (query.getSourceFilter() != null) { - searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); - } + CountRequest countRequest = buildCountRequest(query, entity, index, type); + CountRequest request = prepareCountRequest(countRequest); + return doCount(request); + }); - if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) { - searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder()); - } + } - sort(query, entity).forEach(searchSourceBuilder::sort); + private CountRequest buildCountRequest(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, + @Nullable String type) { - if (query.getMinScore() > 0) { - searchSourceBuilder.minScore(query.getMinScore()); - } + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); + CountRequest request = new CountRequest(indices(query, indexCoordinates::getIndexName)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(mappedQuery(query, entity)); + searchSourceBuilder.trackScores(query.getTrackScores()); - if (query.getIndicesOptions() != null) { - request.indicesOptions(query.getIndicesOptions()); - } + QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); + if (postFilterQuery != null) { + searchSourceBuilder.postFilter(postFilterQuery); + } - if (query.getPreference() != null) { - request.preference(query.getPreference()); - } + if (query.getSourceFilter() != null) { + searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); + } - if (query.getSearchType() != null) { - request.searchType(query.getSearchType()); - } + if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) { + searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder()); + } - Pageable pageable = query.getPageable(); + sort(query, entity).forEach(searchSourceBuilder::sort); - if (pageable.isPaged()) { + if (query.getMinScore() > 0) { + searchSourceBuilder.minScore(query.getMinScore()); + } - long offset = pageable.getOffset(); - if (offset > Integer.MAX_VALUE) { - throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE)); - } + if (query.getIndicesOptions() != null) { + request.indicesOptions(query.getIndicesOptions()); + } - searchSourceBuilder.from((int) offset); - searchSourceBuilder.size(pageable.getPageSize()); + if (query.getPreference() != null) { + request.preference(query.getPreference()); + } + request.source(searchSourceBuilder); + return request; + } - request.source(searchSourceBuilder); - return doFind(prepareSearchRequest(request)); - } else if (query.isLimiting()) { - searchSourceBuilder.from(0); - searchSourceBuilder.size(query.getMaxResults()); + private SearchRequest buildSearchRequest(Query query, ElasticsearchPersistentEntity entity, @Nullable String index, + @Nullable String type) { + IndexCoordinates indexCoordinates = operations.determineIndex(entity, index, type); + SearchRequest request = new SearchRequest(indices(query, indexCoordinates::getIndexName)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(mappedQuery(query, entity)); + searchSourceBuilder.version(entity.hasVersionProperty()); + searchSourceBuilder.trackScores(query.getTrackScores()); + + QueryBuilder postFilterQuery = mappedFilterQuery(query, entity); + if (postFilterQuery != null) { + searchSourceBuilder.postFilter(postFilterQuery); + } - request.source(searchSourceBuilder); - return doFind(prepareSearchRequest(request)); - } else { + if (query.getSourceFilter() != null) { + searchSourceBuilder.fetchSource(query.getSourceFilter().getIncludes(), query.getSourceFilter().getExcludes()); + } + + if (query instanceof SearchQuery && ((SearchQuery) query).getCollapseBuilder() != null) { + searchSourceBuilder.collapse(((SearchQuery) query).getCollapseBuilder()); + } + + sort(query, entity).forEach(searchSourceBuilder::sort); + + if (query.getMinScore() > 0) { + searchSourceBuilder.minScore(query.getMinScore()); + } + + if (query.getIndicesOptions() != null) { + request.indicesOptions(query.getIndicesOptions()); + } + + if (query.getPreference() != null) { + request.preference(query.getPreference()); + } - request.source(searchSourceBuilder); - return doScan(prepareSearchRequest(request)); + if (query.getSearchType() != null) { + request.searchType(query.getSearchType()); + } + + Pageable pageable = query.getPageable(); + + if (pageable.isPaged()) { + + long offset = pageable.getOffset(); + if (offset > Integer.MAX_VALUE) { + throw new IllegalArgumentException(String.format("Offset must not be more than %s", Integer.MAX_VALUE)); } - }); - } + searchSourceBuilder.from((int) offset); + searchSourceBuilder.size(pageable.getPageSize()); + + request.source(searchSourceBuilder); + } else if (query.isLimiting()) { + searchSourceBuilder.from(0); + searchSourceBuilder.size(query.getMaxResults()); + + request.source(searchSourceBuilder); + } else { + request.source(searchSourceBuilder); + } + return request; + } /* * (non-Javadoc) * @see org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations#count(Query, Class, String, String) */ - @Override - public Mono count(Query query, Class entityType, String index, String type) { - - // TODO: ES 7.0 has a dedicated CountRequest - use that one once available. - return find(query, entityType, index, type).count(); - } /* * (non-Javadoc) @@ -443,6 +497,22 @@ protected IndexRequest prepareIndexRequest(Object source, IndexRequest request) return prepareWriteRequest(request); } + /** + * Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the + * {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable. + * + * @param request the generated {@link CountRequest}. + * @return never {@literal null}. + */ + protected CountRequest prepareCountRequest(CountRequest request) { + + if (indicesOptions == null) { + return request; + } + + return request.indicesOptions(indicesOptions); + } + /** * Customization hook to modify a generated {@link SearchRequest} prior to its execution. Eg. by setting the * {@link SearchRequest#indicesOptions(IndicesOptions) indices options} if applicable. @@ -543,16 +613,32 @@ protected Flux doFind(SearchRequest request) { .onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); } + /** + * Customization hook on the actual execution result {@link Publisher}.
+ * + * @param request the already prepared {@link CountRequest} ready to be executed. + * @return a {@link Mono} emitting the result of the operation. + */ + protected Mono doCount(CountRequest request) { + + if (QUERY_LOGGER.isDebugEnabled()) { + QUERY_LOGGER.debug("Executing doCount: {}", request); + } + + return Mono.from(execute(client -> client.count(request))) // + .onErrorResume(NoSuchIndexException.class, it -> Mono.empty()); + } + /** * Customization hook on the actual execution result {@link Publisher}.
* * @param request the already prepared {@link SearchRequest} ready to be executed. * @return a {@link Flux} emitting the result of the operation. */ - protected Flux doScan(SearchRequest request) { + protected Flux doScroll(SearchRequest request) { if (QUERY_LOGGER.isDebugEnabled()) { - QUERY_LOGGER.debug("Executing doScan: {}", request); + QUERY_LOGGER.debug("Executing doScroll: {}", request); } return Flux.from(execute(client -> client.scroll(request))) //