Skip to content

DATAES-680 - ReactiveElasticsearchTemplate-should-use-the-count-API. #341

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
Expand Down Expand Up @@ -79,6 +78,8 @@
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.DeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -328,6 +329,17 @@ public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequ
.publishNext();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Mono<Long> count(HttpHeaders headers, CountRequest countRequest) {
return sendRequest(countRequest, RequestCreator.count(), CountResponse.class, headers) //
.map(CountResponse::getCount) //
.next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
Expand Down Expand Up @@ -572,13 +584,12 @@ private static GetResult getResponseToGetResult(GetResponse response) {

// -->

private <Req extends ActionRequest, Resp extends ActionResponse> Flux<Resp> sendRequest(Req request,
Function<Req, Request> converter, Class<Resp> responseType, HttpHeaders headers) {
private <Req extends ActionRequest, Resp> Flux<Resp> sendRequest(Req request, Function<Req, Request> converter,
Class<Resp> responseType, HttpHeaders headers) {
return sendRequest(converter.apply(request), responseType, headers);
}

private <AR extends ActionResponse> Flux<AR> sendRequest(Request request, Class<AR> responseType,
HttpHeaders headers) {
private <Resp> Flux<Resp> sendRequest(Request request, Class<Resp> responseType, HttpHeaders headers) {

String logId = ClientLogger.newLogId();

Expand Down Expand Up @@ -807,6 +818,10 @@ static Function<FlushRequest, Request> flushIndex() {
return RequestConverters::flushIndex;
}

static Function<CountRequest, Request> count() {
return RequestConverters::count;
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.index.get.GetResult;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
Expand All @@ -59,6 +60,7 @@
*
* @author Christoph Strobl
* @author Mark Paluch
* @author Peter-Josef Meisch
* @author Henrique Amaral
* @since 3.2
* @see ClientConfiguration
Expand Down Expand Up @@ -331,6 +333,47 @@ default Mono<DeleteResponse> delete(DeleteRequest deleteRequest) {
*/
Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest);

/**
* Execute a {@link SearchRequest} against the {@literal count} API.
*
* @param consumer new {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html">Count API on
* elastic.co</a>
* @return the {@link Mono} emitting the count result.
* @since 4.0
*/
default Mono<Long> count(Consumer<CountRequest> consumer) {

CountRequest countRequest = new CountRequest();
consumer.accept(countRequest);
return count(countRequest);
}

/**
* Execute a {@link SearchRequest} against the {@literal count} API.
*
* @param countRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html">Count API on
* elastic.co</a>
* @return the {@link Mono} emitting the count result.
* @since 4.0
*/
default Mono<Long> count(CountRequest countRequest) {
return count(HttpHeaders.EMPTY, countRequest);
}

/**
* Execute a {@link SearchRequest} against the {@literal count} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param countRequest must not be {@literal null}.
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html">Count API on
* elastic.co</a>
* @return the {@link Mono} emitting the count result.
* @since 4.0
*/
Mono<Long> count(HttpHeaders headers, CountRequest countRequest);

/**
* Execute a {@link SearchRequest} against the {@literal search} API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RethrottleRequest;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.indices.AnalyzeRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Priority;
Expand Down Expand Up @@ -388,6 +389,32 @@ public static Request search(SearchRequest searchRequest) {
return request;
}

/**
* Creates a count request.
*
* @param countRequest the search defining the data to be counted
* @return Elasticsearch count request
* @since 4.0
*/
public static Request count(CountRequest countRequest) {
Request request = new Request(HttpMethod.POST.name(),
endpoint(countRequest.indices(), countRequest.types(), "_count"));

Params params = new Params(request);
addCountRequestParams(params, countRequest);

if (countRequest.source() != null) {
request.setEntity(createEntity(countRequest.source(), REQUEST_BODY_CONTENT_TYPE));
}
return request;
}

private static void addCountRequestParams(Params params, CountRequest countRequest) {
params.withRouting(countRequest.routing());
params.withPreference(countRequest.preference());
params.withIndicesOptions(countRequest.indicesOptions());
}

private static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam("typed_keys", "true");
params.withRouting(searchRequest.routing());
Expand Down
Loading