Skip to content

Introduce cluster operations #1768

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -105,6 +107,7 @@
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
import org.springframework.data.elasticsearch.client.NoReachableHostException;
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster;
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
import org.springframework.data.elasticsearch.client.util.NamedXContents;
import org.springframework.data.elasticsearch.client.util.ScrollState;
Expand Down Expand Up @@ -142,7 +145,7 @@
* @see ClientConfiguration
* @see ReactiveRestClients
*/
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster {

private final HostProvider<?> hostProvider;
private final RequestCreator requestCreator;
Expand Down Expand Up @@ -297,10 +300,6 @@ public void setHeadersSupplier(Supplier<HttpHeaders> headersSupplier) {
this.headersSupplier = headersSupplier;
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders)
*/
@Override
public Mono<Boolean> ping(HttpHeaders headers) {

Expand All @@ -309,21 +308,13 @@ public Mono<Boolean> ping(HttpHeaders headers) {
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders)
*/
@Override
public Mono<MainResponse> info(HttpHeaders headers) {

return sendRequest(new MainRequest(), requestCreator.info(), MainResponse.class, headers) //
.next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
*/
@Override
public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {

Expand All @@ -341,10 +332,6 @@ public Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest
.flatMap(Flux::fromArray); //
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
*/
@Override
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {

Expand All @@ -353,48 +340,33 @@ public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
.next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest)
*/
@Override
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
*/
@Override
public Indices indices() {
return this;
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest)
*/
@Override
public Cluster cluster() {
return this;
}

@Override
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest)
*/
@Override
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {

return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) //
.next();
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Mono<Long> count(HttpHeaders headers, SearchRequest searchRequest) {
searchRequest.source().trackTotalHits(true);
Expand All @@ -412,10 +384,6 @@ public Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest
.map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {

Expand All @@ -435,10 +403,6 @@ public Flux<Suggest> suggest(HttpHeaders headers, SearchRequest searchRequest) {
.map(SearchResponse::getSuggest);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest) {

Expand All @@ -453,10 +417,6 @@ public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequ
.flatMap(Flux::fromIterable);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
*/
@Override
public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {

Expand Down Expand Up @@ -506,10 +466,6 @@ private Publisher<?> cleanupScroll(HttpHeaders headers, ScrollState state) {
return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
*/
@Override
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {

Expand All @@ -524,10 +480,6 @@ public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest
.map(ByQueryResponse::of);
}

/*
* (non-Javadoc)
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
*/
@Override
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) //
Expand Down Expand Up @@ -812,6 +764,14 @@ public Mono<GetIndexResponse> getIndex(HttpHeaders headers, GetIndexRequest getI

// endregion

// region cluster operations
@Override
public Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest) {
return sendRequest(clusterHealthRequest, requestCreator.clusterHealth(), ClusterHealthResponse.class, headers)
.next();
}
// endregion

// region helper functions
private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
Class<T> responseType) {
Expand Down Expand Up @@ -965,7 +925,7 @@ private static ElasticsearchException getElasticsearchException(String content,
} while (token == XContentParser.Token.FIELD_NAME);

return null;
} catch (IOException e) {
} catch (Exception e) {
return new ElasticsearchStatusException(content, status);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Collection;
import java.util.function.Consumer;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -269,6 +271,14 @@ default Mono<IndexResponse> index(IndexRequest indexRequest) {
*/
Indices indices();

/**
* Gain Access to cluster related commands.
*
* @return Cluster implementations
* @since 4.2
*/
Cluster cluster();

/**
* Execute an {@link UpdateRequest} against the {@literal update} API to alter a document.
*
Expand Down Expand Up @@ -1678,4 +1688,45 @@ default Mono<GetIndexResponse> getIndex(GetIndexRequest getIndexRequest) {
*/
Mono<GetIndexResponse> getIndex(HttpHeaders headers, GetIndexRequest getIndexRequest);
}

/**
* Encapsulation of methods for accessing the Cluster API.
*
* @author Peter-Josef Meisch
* @since 4.2
*/
interface Cluster {

/**
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
*
* @param consumer never {@literal null}.
* @return Mono emitting the {@link ClusterHealthResponse}.
*/
default Mono<ClusterHealthResponse> health(Consumer<ClusterHealthRequest> consumer) {

ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest();
consumer.accept(clusterHealthRequest);
return health(clusterHealthRequest);
}

/**
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
*
* @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the
* {@link ClusterHealthResponse}.
*/
default Mono<ClusterHealthResponse> health(ClusterHealthRequest clusterHealthRequest) {
return health(HttpHeaders.EMPTY, clusterHealthRequest);
}

/**
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
*
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
* @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the
* {@link ClusterHealthResponse}.
*/
Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.function.Function;

import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
Expand Down Expand Up @@ -266,4 +267,11 @@ default Function<GetFieldMappingsRequest, Request> getFieldMapping() {
default Function<GetIndexRequest, Request> getIndex() {
return RequestConverters::getIndex;
}

/**
* @since 4.2
*/
default Function<ClusterHealthRequest, Request> clusterHealth() {
return RequestConverters::clusterHealth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@

/**
* <p>
* Original implementation source {@link org.elasticsearch.client.RequestConverters} and
* {@link org.elasticsearch.client.IndicesRequestConverters} by {@literal Elasticsearch}
* Original implementation source {@link org.elasticsearch.client.RequestConverters},
* {@link org.elasticsearch.client.IndicesRequestConverters} and
* {@link org.elasticsearch.client.ClusterRequestConverters} by {@literal Elasticsearch}
* (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0.
* </p>
* Modified for usage with {@link ReactiveElasticsearchClient}.
Expand Down Expand Up @@ -1003,6 +1004,26 @@ public static Request getFieldMapping(GetFieldMappingsRequest getFieldMappingsRe
return request;
}

public static Request clusterHealth(ClusterHealthRequest healthRequest) {
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
String endpoint = new EndpointBuilder().addPathPartAsIs(new String[] { "_cluster/health" })
.addCommaSeparatedPathParts(indices).build();

Request request = new Request("GET", endpoint);

RequestConverters.Params parameters = new Params(request);
parameters.withWaitForStatus(healthRequest.waitForStatus());
parameters.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards());
parameters.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards());
parameters.withWaitForActiveShards(healthRequest.waitForActiveShards(), ActiveShardCount.NONE);
parameters.withWaitForNodes(healthRequest.waitForNodes());
parameters.withWaitForEvents(healthRequest.waitForEvents());
parameters.withTimeout(healthRequest.timeout());
parameters.withMasterTimeout(healthRequest.masterNodeTimeout());
parameters.withLocal(healthRequest.local()).withLevel(healthRequest.level());
return request;
}

static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) {

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.Objects;

import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
Expand All @@ -39,25 +40,34 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera

/**
* get an {@link IndexOperations} that is bound to the given class
*
*
* @return IndexOperations
*/
IndexOperations indexOps(Class<?> clazz);

/**
* get an {@link IndexOperations} that is bound to the given index
*
*
* @return IndexOperations
*/
IndexOperations indexOps(IndexCoordinates index);

/**
* return a {@link ClusterOperations} instance that uses the same client communication setup as this
* ElasticsearchOperations instance.
*
* @return ClusterOperations implementation
* @since 4.2
*/
ClusterOperations cluster();

ElasticsearchConverter getElasticsearchConverter();

IndexCoordinates getIndexCoordinatesFor(Class<?> clazz);

/**
* gets the routing for an entity which might be defined by a join-type relation
*
*
* @param entity the entity
* @return the routing, may be null if not set.
* @since 4.1
Expand All @@ -68,7 +78,7 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
// region helper
/**
* gets the String representation for an id.
*
*
* @param id
* @return
* @since 4.0
Expand Down
Loading