diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 6203997aa..a7e69670f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -45,6 +45,8 @@ import org.apache.http.util.EntityUtils; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -105,6 +107,7 @@ import org.springframework.data.elasticsearch.client.ElasticsearchHost; import org.springframework.data.elasticsearch.client.NoReachableHostException; import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification; +import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices; import org.springframework.data.elasticsearch.client.util.NamedXContents; import org.springframework.data.elasticsearch.client.util.ScrollState; @@ -142,7 +145,7 @@ * @see ClientConfiguration * @see ReactiveRestClients */ -public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices { +public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster { private final HostProvider hostProvider; private final RequestCreator requestCreator; @@ -297,10 +300,6 @@ public void setHeadersSupplier(Supplier headersSupplier) { this.headersSupplier = headersSupplier; } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders) - */ @Override public Mono ping(HttpHeaders headers) { @@ -309,10 +308,6 @@ public Mono ping(HttpHeaders headers) { .onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next(); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders) - */ @Override public Mono info(HttpHeaders headers) { @@ -320,10 +315,6 @@ public Mono info(HttpHeaders headers) { .next(); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest) - */ @Override public Mono get(HttpHeaders headers, GetRequest getRequest) { @@ -341,10 +332,6 @@ public Flux multiGet(HttpHeaders headers, MultiGetRequest .flatMap(Flux::fromArray); // } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest) - */ @Override public Mono exists(HttpHeaders headers, GetRequest getRequest) { @@ -353,37 +340,26 @@ public Mono exists(HttpHeaders headers, GetRequest getRequest) { .next(); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest) - */ @Override public Mono index(HttpHeaders headers, IndexRequest indexRequest) { return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).next(); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices() - */ @Override public Indices indices() { return this; } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest) - */ + @Override + public Cluster cluster() { + return this; + } + @Override public Mono update(HttpHeaders headers, UpdateRequest updateRequest) { return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).next(); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest) - */ @Override public Mono delete(HttpHeaders headers, DeleteRequest deleteRequest) { @@ -391,10 +367,6 @@ public Mono delete(HttpHeaders headers, DeleteRequest deleteRequ .next(); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) - */ @Override public Mono count(HttpHeaders headers, SearchRequest searchRequest) { searchRequest.source().trackTotalHits(true); @@ -412,10 +384,6 @@ public Flux searchTemplate(HttpHeaders headers, SearchTemplateRequest .map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) - */ @Override public Flux search(HttpHeaders headers, SearchRequest searchRequest) { @@ -435,10 +403,6 @@ public Flux suggest(HttpHeaders headers, SearchRequest searchRequest) { .map(SearchResponse::getSuggest); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) - */ @Override public Flux aggregate(HttpHeaders headers, SearchRequest searchRequest) { @@ -453,10 +417,6 @@ public Flux aggregate(HttpHeaders headers, SearchRequest searchRequ .flatMap(Flux::fromIterable); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest) - */ @Override public Flux scroll(HttpHeaders headers, SearchRequest searchRequest) { @@ -506,10 +466,6 @@ private Publisher cleanupScroll(HttpHeaders headers, ScrollState state) { return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest) - */ @Override public Mono deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) { @@ -524,10 +480,6 @@ public Mono updateBy(HttpHeaders headers, UpdateByQueryRequest .map(ByQueryResponse::of); } - /* - * (non-Javadoc) - * @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest) - */ @Override public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) // @@ -812,6 +764,14 @@ public Mono getIndex(HttpHeaders headers, GetIndexRequest getI // endregion + // region cluster operations + @Override + public Mono health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest) { + return sendRequest(clusterHealthRequest, requestCreator.clusterHealth(), ClusterHealthResponse.class, headers) + .next(); + } + // endregion + // region helper functions private Publisher readResponseBody(String logId, Request request, ClientResponse response, Class responseType) { @@ -965,7 +925,7 @@ private static ElasticsearchException getElasticsearchException(String content, } while (token == XContentParser.Token.FIELD_NAME); return null; - } catch (IOException e) { + } catch (Exception e) { return new ElasticsearchStatusException(content, status); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index c8f4ae9f2..201513ef0 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -22,6 +22,8 @@ import java.util.Collection; import java.util.function.Consumer; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -269,6 +271,14 @@ default Mono index(IndexRequest indexRequest) { */ Indices indices(); + /** + * Gain Access to cluster related commands. + * + * @return Cluster implementations + * @since 4.2 + */ + Cluster cluster(); + /** * Execute an {@link UpdateRequest} against the {@literal update} API to alter a document. * @@ -1678,4 +1688,45 @@ default Mono getIndex(GetIndexRequest getIndexRequest) { */ Mono getIndex(HttpHeaders headers, GetIndexRequest getIndexRequest); } + + /** + * Encapsulation of methods for accessing the Cluster API. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ + interface Cluster { + + /** + * Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API. + * + * @param consumer never {@literal null}. + * @return Mono emitting the {@link ClusterHealthResponse}. + */ + default Mono health(Consumer consumer) { + + ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest(); + consumer.accept(clusterHealthRequest); + return health(clusterHealthRequest); + } + + /** + * Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API. + * + * @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the + * {@link ClusterHealthResponse}. + */ + default Mono health(ClusterHealthRequest clusterHealthRequest) { + return health(HttpHeaders.EMPTY, clusterHealthRequest); + } + + /** + * Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the + * {@link ClusterHealthResponse}. + */ + Mono health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest); + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 1d5363137..61f5ac25e 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.function.Function; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest; import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; @@ -266,4 +267,11 @@ default Function getFieldMapping() { default Function getIndex() { return RequestConverters::getIndex; } + + /** + * @since 4.2 + */ + default Function clusterHealth() { + return RequestConverters::clusterHealth; + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index ea2e5df32..f77b77c82 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -107,8 +107,9 @@ /** *

- * Original implementation source {@link org.elasticsearch.client.RequestConverters} and - * {@link org.elasticsearch.client.IndicesRequestConverters} by {@literal Elasticsearch} + * Original implementation source {@link org.elasticsearch.client.RequestConverters}, + * {@link org.elasticsearch.client.IndicesRequestConverters} and + * {@link org.elasticsearch.client.ClusterRequestConverters} by {@literal Elasticsearch} * (https://www.elastic.co) licensed under the Apache License, Version 2.0. *

* Modified for usage with {@link ReactiveElasticsearchClient}. @@ -1003,6 +1004,26 @@ public static Request getFieldMapping(GetFieldMappingsRequest getFieldMappingsRe return request; } + public static Request clusterHealth(ClusterHealthRequest healthRequest) { + String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices(); + String endpoint = new EndpointBuilder().addPathPartAsIs(new String[] { "_cluster/health" }) + .addCommaSeparatedPathParts(indices).build(); + + Request request = new Request("GET", endpoint); + + RequestConverters.Params parameters = new Params(request); + parameters.withWaitForStatus(healthRequest.waitForStatus()); + parameters.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards()); + parameters.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards()); + parameters.withWaitForActiveShards(healthRequest.waitForActiveShards(), ActiveShardCount.NONE); + parameters.withWaitForNodes(healthRequest.waitForNodes()); + parameters.withWaitForEvents(healthRequest.waitForEvents()); + parameters.withTimeout(healthRequest.timeout()); + parameters.withMasterTimeout(healthRequest.masterNodeTimeout()); + parameters.withLocal(healthRequest.local()).withLevel(healthRequest.level()); + return request; + } + static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) { try { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index 29a26270e..557dc1fbc 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -17,6 +17,7 @@ import java.util.Objects; +import org.springframework.data.elasticsearch.core.cluster.ClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.routing.RoutingResolver; @@ -39,25 +40,34 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera /** * get an {@link IndexOperations} that is bound to the given class - * + * * @return IndexOperations */ IndexOperations indexOps(Class clazz); /** * get an {@link IndexOperations} that is bound to the given index - * + * * @return IndexOperations */ IndexOperations indexOps(IndexCoordinates index); + /** + * return a {@link ClusterOperations} instance that uses the same client communication setup as this + * ElasticsearchOperations instance. + * + * @return ClusterOperations implementation + * @since 4.2 + */ + ClusterOperations cluster(); + ElasticsearchConverter getElasticsearchConverter(); IndexCoordinates getIndexCoordinatesFor(Class clazz); /** * gets the routing for an entity which might be defined by a join-type relation - * + * * @param entity the entity * @return the routing, may be null if not set. * @since 4.1 @@ -68,7 +78,7 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera // region helper /** * gets the String representation for an id. - * + * * @param id * @return * @since 4.0 diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 18360dfc6..f4826db23 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -44,6 +44,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.elasticsearch.core.cluster.ClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; @@ -142,6 +143,13 @@ public IndexOperations indexOps(IndexCoordinates index) { } // endregion + // region ClusterOperations + @Override + public ClusterOperations cluster() { + return ClusterOperations.forTemplate(this); + } + // endregion + // region DocumentOperations public String doIndex(IndexQuery query, IndexCoordinates index) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index d0d4852a5..70911a12e 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -41,6 +41,7 @@ import org.elasticsearch.search.suggest.SuggestBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.data.elasticsearch.core.cluster.ClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; @@ -142,6 +143,13 @@ public IndexOperations indexOps(IndexCoordinates index) { } // endregion + // region ClusterOperations + @Override + public ClusterOperations cluster() { + return ClusterOperations.forTemplate(this); + } + // endregion + // region getter/setter @Nullable public String getSearchTimeout() { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java index 37862b212..11bea3f97 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchOperations.java @@ -17,6 +17,7 @@ import org.reactivestreams.Publisher; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; +import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -56,6 +57,16 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati */ Publisher executeWithIndicesClient(IndicesClientCallback> callback); + /** + * Execute within a {@link ClusterClientCallback} managing resources and translating errors. + * + * @param callback must not be {@literal null}. + * @param the type the Publisher emits + * @return the {@link Publisher} emitting results. + * @since 4.1 + */ + Publisher executeWithClusterClient(ClusterClientCallback> callback); + /** * Get the {@link ElasticsearchConverter} used. * @@ -75,6 +86,7 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati /** * Creates a {@link ReactiveIndexOperations} that is bound to the given index + * * @param index IndexCoordinates specifying the index * @return ReactiveIndexOperations implementation * @since 4.1 @@ -83,13 +95,23 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati /** * Creates a {@link ReactiveIndexOperations} that is bound to the given class + * * @param clazz the entity clazz specifiying the index information * @return ReactiveIndexOperations implementation * @since 4.1 */ ReactiveIndexOperations indexOps(Class clazz); - //region routing + /** + * return a {@link ReactiveClusterOperations} instance that uses the same client communication setup as this + * ElasticsearchOperations instance. + * + * @return ClusterOperations implementation + * @since 4.2 + */ + ReactiveClusterOperations cluster(); + + // region routing /** * Returns a copy of this instance with the same configuration, but that uses a different {@link RoutingResolver} to * obtain routing information. @@ -98,7 +120,7 @@ public interface ReactiveElasticsearchOperations extends ReactiveDocumentOperati * @return DocumentOperations instance */ ReactiveElasticsearchOperations withRouting(RoutingResolver routingResolver); - //endregion + // endregion /** * Callback interface to be used with {@link #execute(ClientCallback)} for operating directly on @@ -114,8 +136,8 @@ interface ClientCallback> { } /** - * Callback interface to be used with {@link #executeWithIndicesClient(IndicesClientCallback)} for operating directly on - * {@link ReactiveElasticsearchClient.Indices}. + * Callback interface to be used with {@link #executeWithIndicesClient(IndicesClientCallback)} for operating directly + * on {@link ReactiveElasticsearchClient.Indices}. * * @param the return type * @since 4.1 @@ -123,4 +145,15 @@ interface ClientCallback> { interface IndicesClientCallback> { T doWithClient(ReactiveElasticsearchClient.Indices client); } + + /** + * Callback interface to be used with {@link #executeWithClusterClient(ClusterClientCallback)} for operating directly + * on {@link ReactiveElasticsearchClient.Cluster}. + * + * @param the return type + * @since 4.2 + */ + interface ClusterClientCallback> { + T doWithClient(ReactiveElasticsearchClient.Cluster client); + } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 925f6c948..47d129f6c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -57,6 +57,8 @@ import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient; import org.springframework.data.elasticsearch.core.EntityOperations.AdaptibleEntity; +import org.springframework.data.elasticsearch.core.cluster.DefaultReactiveClusterOperations; +import org.springframework.data.elasticsearch.core.cluster.ReactiveClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.Document; @@ -921,6 +923,11 @@ public Publisher executeWithIndicesClient(IndicesClientCallback callback.doWithClient(getIndicesClient())).onErrorMap(this::translateException); } + @Override + public Publisher executeWithClusterClient(ClusterClientCallback> callback) { + return Flux.defer(() -> callback.doWithClient(getClusterClient())).onErrorMap(this::translateException); + } + @Override public ElasticsearchConverter getElasticsearchConverter() { return converter; @@ -936,6 +943,11 @@ public ReactiveIndexOperations indexOps(Class clazz) { return new DefaultReactiveIndexOperations(this, clazz); } + @Override + public ReactiveClusterOperations cluster() { + return new DefaultReactiveClusterOperations(this); + } + @Override public IndexCoordinates getIndexCoordinatesFor(Class clazz) { return getPersistentEntityFor(clazz).getIndexCoordinates(); @@ -970,7 +982,19 @@ protected ReactiveElasticsearchClient.Indices getIndicesClient() { throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Indices implementation available"); } - // endregion + /** + * Obtain the {@link ReactiveElasticsearchClient.Cluster} to operate upon. + * + * @return never {@literal null}. + */ + protected ReactiveElasticsearchClient.Cluster getClusterClient() { + + if (client instanceof ReactiveElasticsearchClient.Cluster) { + return (ReactiveElasticsearchClient.Cluster) client; + } + + throw new UncategorizedElasticsearchException("No ReactiveElasticsearchClient.Cluster implementation available"); + } /** * translates an Exception if possible. Exceptions that are no {@link RuntimeException}s are wrapped in a diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index 546109602..1b8dfb73b 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -26,6 +26,7 @@ import java.util.Set; import java.util.stream.Collectors; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; @@ -36,6 +37,7 @@ import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.compress.CompressedXContent; +import org.springframework.data.elasticsearch.core.cluster.ClusterHealth; import org.springframework.data.elasticsearch.core.document.Document; import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.Settings; @@ -109,7 +111,7 @@ public static List getIndexInformations(GetIndexResponse getIn * @return a document that represents {@link Settings} */ private static Settings settingsFromGetIndexResponse(GetIndexResponse getIndexResponse, String indexName) { - Settings settings= new Settings(); + Settings settings = new Settings(); org.elasticsearch.common.settings.Settings indexSettings = getIndexResponse.getSettings().get(indexName); @@ -289,4 +291,26 @@ public static MultiGetItem.Failure getFailure(MultiGetItemResponse itemResponse) } // endregion + // region cluster operations + public static ClusterHealth clusterHealth(ClusterHealthResponse clusterHealthResponse) { + return ClusterHealth.builder() // + .withActivePrimaryShards(clusterHealthResponse.getActivePrimaryShards()) // + .withActiveShards(clusterHealthResponse.getActiveShards()) // + .withActiveShardsPercent(clusterHealthResponse.getActiveShardsPercent()) // + .withClusterName(clusterHealthResponse.getClusterName()) // + .withDelayedUnassignedShards(clusterHealthResponse.getDelayedUnassignedShards()) // + .withInitializingShards(clusterHealthResponse.getInitializingShards()) // + .withNumberOfDataNodes(clusterHealthResponse.getNumberOfDataNodes()) // + .withNumberOfInFlightFetch(clusterHealthResponse.getNumberOfInFlightFetch()) // + .withNumberOfNodes(clusterHealthResponse.getNumberOfNodes()) // + .withNumberOfPendingTasks(clusterHealthResponse.getNumberOfPendingTasks()) // + .withRelocatingShards(clusterHealthResponse.getRelocatingShards()) // + .withStatus(clusterHealthResponse.getStatus().toString()) // + .withTaskMaxWaitingTimeMillis(clusterHealthResponse.getTaskMaxWaitingTime().millis()) // + .withTimedOut(clusterHealthResponse.isTimedOut()) // + .withUnassignedShards(clusterHealthResponse.getUnassignedShards()) // + .build(); // + + } + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/ClusterHealth.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/ClusterHealth.java new file mode 100644 index 000000000..a2acc29af --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/ClusterHealth.java @@ -0,0 +1,248 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +/** + * Information about the cluster health. Contains currently only the top level elements returned from Elasticsearch. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +public class ClusterHealth { + + private final String clusterName; + private final String status; + private final int numberOfNodes; + private final int numberOfDataNodes; + private final int activeShards; + private final int relocatingShards; + private final int activePrimaryShards; + private final int initializingShards; + private final int unassignedShards; + private final double activeShardsPercent; + private final int numberOfPendingTasks; + private final boolean timedOut; + private final int numberOfInFlightFetch; + private final int delayedUnassignedShards; + private final long taskMaxWaitingTimeMillis; + + private ClusterHealth(String clusterName, String status, int numberOfNodes, int numberOfDataNodes, int activeShards, + int relocatingShards, int activePrimaryShards, int initializingShards, int unassignedShards, + double activeShardsPercent, int numberOfPendingTasks, boolean timedOut, int numberOfInFlightFetch, + int delayedUnassignedShards, long taskMaxWaitingTimeMillis) { + this.clusterName = clusterName; + this.status = status; + this.numberOfNodes = numberOfNodes; + this.numberOfDataNodes = numberOfDataNodes; + this.activeShards = activeShards; + this.relocatingShards = relocatingShards; + this.activePrimaryShards = activePrimaryShards; + this.initializingShards = initializingShards; + this.unassignedShards = unassignedShards; + this.activeShardsPercent = activeShardsPercent; + this.numberOfPendingTasks = numberOfPendingTasks; + this.timedOut = timedOut; + this.numberOfInFlightFetch = numberOfInFlightFetch; + this.delayedUnassignedShards = delayedUnassignedShards; + this.taskMaxWaitingTimeMillis = taskMaxWaitingTimeMillis; + } + + public String getClusterName() { + return clusterName; + } + + public String getStatus() { + return status; + } + + public int getNumberOfNodes() { + return numberOfNodes; + } + + public int getNumberOfDataNodes() { + return numberOfDataNodes; + } + + public int getActiveShards() { + return activeShards; + } + + public int getRelocatingShards() { + return relocatingShards; + } + + public int getActivePrimaryShards() { + return activePrimaryShards; + } + + public int getInitializingShards() { + return initializingShards; + } + + public int getUnassignedShards() { + return unassignedShards; + } + + public double getActiveShardsPercent() { + return activeShardsPercent; + } + + public int getNumberOfPendingTasks() { + return numberOfPendingTasks; + } + + public boolean isTimedOut() { + return timedOut; + } + + public int getNumberOfInFlightFetch() { + return numberOfInFlightFetch; + } + + public int getDelayedUnassignedShards() { + return delayedUnassignedShards; + } + + public long getTaskMaxWaitingTimeMillis() { + return taskMaxWaitingTimeMillis; + } + + @Override + public String toString() { + return "ClusterHealth{" + + "clusterName='" + clusterName + '\'' + + ", status='" + status + '\'' + + ", numberOfNodes=" + numberOfNodes + + ", numberOfDataNodes=" + numberOfDataNodes + + ", activeShards=" + activeShards + + ", relocatingShards=" + relocatingShards + + ", activePrimaryShards=" + activePrimaryShards + + ", initializingShards=" + initializingShards + + ", unassignedShards=" + unassignedShards + + ", activeShardsPercent=" + activeShardsPercent + + ", numberOfPendingTasks=" + numberOfPendingTasks + + ", timedOut=" + timedOut + + ", numberOfInFlightFetch=" + numberOfInFlightFetch + + ", delayedUnassignedShards=" + delayedUnassignedShards + + ", taskMaxWaitingTimeMillis=" + taskMaxWaitingTimeMillis + + '}'; + } + + public static ClusterHealthBuilder builder() { + return new ClusterHealthBuilder(); + } + + public static final class ClusterHealthBuilder { + private String clusterName = ""; + private String status = ""; + private int numberOfNodes; + private int numberOfDataNodes; + private int activeShards; + private int relocatingShards; + private int activePrimaryShards; + private int initializingShards; + private int unassignedShards; + private double activeShardsPercent; + private int numberOfPendingTasks; + private boolean timedOut; + private int numberOfInFlightFetch; + private int delayedUnassignedShards; + private long taskMaxWaitingTimeMillis; + + private ClusterHealthBuilder() {} + + public ClusterHealthBuilder withClusterName(String clusterName) { + this.clusterName = clusterName; + return this; + } + + public ClusterHealthBuilder withStatus(String status) { + this.status = status; + return this; + } + + public ClusterHealthBuilder withNumberOfNodes(int numberOfNodes) { + this.numberOfNodes = numberOfNodes; + return this; + } + + public ClusterHealthBuilder withNumberOfDataNodes(int numberOfDataNodes) { + this.numberOfDataNodes = numberOfDataNodes; + return this; + } + + public ClusterHealthBuilder withActiveShards(int activeShards) { + this.activeShards = activeShards; + return this; + } + + public ClusterHealthBuilder withRelocatingShards(int relocatingShards) { + this.relocatingShards = relocatingShards; + return this; + } + + public ClusterHealthBuilder withActivePrimaryShards(int activePrimaryShards) { + this.activePrimaryShards = activePrimaryShards; + return this; + } + + public ClusterHealthBuilder withInitializingShards(int initializingShards) { + this.initializingShards = initializingShards; + return this; + } + + public ClusterHealthBuilder withUnassignedShards(int unassignedShards) { + this.unassignedShards = unassignedShards; + return this; + } + + public ClusterHealthBuilder withActiveShardsPercent(double activeShardsPercent) { + this.activeShardsPercent = activeShardsPercent; + return this; + } + + public ClusterHealthBuilder withNumberOfPendingTasks(int numberOfPendingTasks) { + this.numberOfPendingTasks = numberOfPendingTasks; + return this; + } + + public ClusterHealthBuilder withTimedOut(boolean timedOut) { + this.timedOut = timedOut; + return this; + } + + public ClusterHealthBuilder withNumberOfInFlightFetch(int numberOfInFlightFetch) { + this.numberOfInFlightFetch = numberOfInFlightFetch; + return this; + } + + public ClusterHealthBuilder withDelayedUnassignedShards(int delayedUnassignedShards) { + this.delayedUnassignedShards = delayedUnassignedShards; + return this; + } + + public ClusterHealthBuilder withTaskMaxWaitingTimeMillis(long taskMaxWaitingTimeMillis) { + this.taskMaxWaitingTimeMillis = taskMaxWaitingTimeMillis; + return this; + } + + public ClusterHealth build() { + return new ClusterHealth(clusterName, status, numberOfNodes, numberOfDataNodes, activeShards, relocatingShards, + activePrimaryShards, initializingShards, unassignedShards, activeShardsPercent, numberOfPendingTasks, + timedOut, numberOfInFlightFetch, delayedUnassignedShards, taskMaxWaitingTimeMillis); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperations.java new file mode 100644 index 000000000..5a6919a20 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperations.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; +import org.springframework.util.Assert; + +/** + * Elasticsearch operations on cluster level. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +public interface ClusterOperations { + + /** + * Creates a ClusterOperations for a {@link ElasticsearchRestTemplate}. + * + * @param template the template, must not be {@literal null} + * @return ClusterOperations + */ + static ClusterOperations forTemplate(ElasticsearchRestTemplate template) { + + Assert.notNull(template, "template must not be null"); + + return new DefaultClusterOperations(template); + } + + /** + * Creates a ClusterOperations for a {@link ElasticsearchTemplate}. + * + * @param template the template, must not be {@literal null} + * @return ClusterOperations + */ + static ClusterOperations forTemplate(ElasticsearchTemplate template) { + + Assert.notNull(template, "template must not be null"); + + return new DefaultTransportClusterOperations(template); + } + + /** + * get the cluster's health status. + * + * @return health information for the cluster. + */ + ClusterHealth health(); +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultClusterOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultClusterOperations.java new file mode 100644 index 000000000..0c0590a27 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultClusterOperations.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.client.RequestOptions; +import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate; +import org.springframework.data.elasticsearch.core.ResponseConverter; + +/** + * Default implementation of {@link ClusterOperations} using the {@link ElasticsearchRestTemplate}. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +class DefaultClusterOperations implements ClusterOperations { + + private final ElasticsearchRestTemplate template; + + DefaultClusterOperations(ElasticsearchRestTemplate template) { + this.template = template; + } + + @Override + public ClusterHealth health() { + + ClusterHealthResponse clusterHealthResponse = template + .execute(client -> client.cluster().health(new ClusterHealthRequest(), RequestOptions.DEFAULT)); + return ResponseConverter.clusterHealth(clusterHealthResponse); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultReactiveClusterOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultReactiveClusterOperations.java new file mode 100644 index 000000000..689b18545 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultReactiveClusterOperations.java @@ -0,0 +1,42 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import reactor.core.publisher.Mono; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; +import org.springframework.data.elasticsearch.core.ResponseConverter; + +/** + * Default implementation of {@link ReactiveClusterOperations} using the {@link ReactiveElasticsearchOperations}. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +public class DefaultReactiveClusterOperations implements ReactiveClusterOperations { + private final ReactiveElasticsearchOperations operations; + + public DefaultReactiveClusterOperations(ReactiveElasticsearchOperations operations) { + this.operations = operations; + } + + @Override + public Mono health() { + return Mono.from(operations.executeWithClusterClient( + client -> client.health(new ClusterHealthRequest()).map(ResponseConverter::clusterHealth))); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultTransportClusterOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultTransportClusterOperations.java new file mode 100644 index 000000000..9edb916e2 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/DefaultTransportClusterOperations.java @@ -0,0 +1,45 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.springframework.data.elasticsearch.core.ElasticsearchTemplate; +import org.springframework.data.elasticsearch.core.ResponseConverter; + +/** + * Default implementation of {@link ClusterOperations} using the + * {@link org.elasticsearch.client.transport.TransportClient}. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +public class DefaultTransportClusterOperations implements ClusterOperations { + + private final ElasticsearchTemplate template; + + public DefaultTransportClusterOperations(ElasticsearchTemplate template) { + this.template = template; + } + + @Override + public ClusterHealth health() { + + ClusterHealthResponse clusterHealthResponse = template.getClient().admin().cluster() + .health(new ClusterHealthRequest()).actionGet(); + return ResponseConverter.clusterHealth(clusterHealthResponse); + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/ReactiveClusterOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/ReactiveClusterOperations.java new file mode 100644 index 000000000..85d561d5d --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/ReactiveClusterOperations.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import reactor.core.publisher.Mono; + +/** + * Reactive Elasticsearch operations on cluster level. + * + * @author Peter-Josef Meisch + * @since 4.2 + */ +public interface ReactiveClusterOperations { + + /** + * get the cluster's health status. + * + * @return a Mono emitting the health information for the cluster. + */ + Mono health(); +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/cluster/package-info.java b/src/main/java/org/springframework/data/elasticsearch/core/cluster/package-info.java new file mode 100644 index 000000000..6d49233c5 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/cluster/package-info.java @@ -0,0 +1,6 @@ +/** + * Interfaces and classes related to Elasticsearch cluster information and management. + */ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.data.elasticsearch.core.cluster; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsIntegrationTests.java new file mode 100644 index 000000000..673109223 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsIntegrationTests.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import static org.assertj.core.api.Assertions.*; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.elasticsearch.core.ElasticsearchOperations; +import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchRestTemplateConfiguration; +import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Peter-Josef Meisch + */ +@SpringIntegrationTest +@ContextConfiguration(classes = { ElasticsearchRestTemplateConfiguration.class }) +public class ClusterOperationsIntegrationTests { + + @Autowired private ElasticsearchOperations operations; + private ClusterOperations clusterOperations; + + @BeforeEach + void setUp() { + clusterOperations = operations.cluster(); + } + + @Test // #1390 + @DisplayName("should return cluster health information") + void shouldReturnClusterHealthInformation() { + + ClusterHealth clusterHealth = clusterOperations.health(); + + List allowedStates = Arrays.asList("GREEN", "YELLOW"); + assertThat(allowedStates).contains(clusterHealth.getStatus()); + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsTransportIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsTransportIntegrationTests.java new file mode 100644 index 000000000..7219eeaa5 --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/cluster/ClusterOperationsTransportIntegrationTests.java @@ -0,0 +1,25 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import org.springframework.data.elasticsearch.junit.jupiter.ElasticsearchTemplateConfiguration; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Peter-Josef Meisch + */ +@ContextConfiguration(classes = { ElasticsearchTemplateConfiguration.class }) +public class ClusterOperationsTransportIntegrationTests extends ClusterOperationsIntegrationTests {} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/cluster/ReactiveClusterOperationsIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/cluster/ReactiveClusterOperationsIntegrationTests.java new file mode 100644 index 000000000..4ff7b89fc --- /dev/null +++ b/src/test/java/org/springframework/data/elasticsearch/core/cluster/ReactiveClusterOperationsIntegrationTests.java @@ -0,0 +1,62 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.cluster; + +import static org.assertj.core.api.Assertions.*; + +import reactor.test.StepVerifier; + +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations; +import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration; +import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest; +import org.springframework.test.context.ContextConfiguration; + +/** + * @author Peter-Josef Meisch + */ +@SpringIntegrationTest +@ContextConfiguration(classes = { ReactiveElasticsearchRestTemplateConfiguration.class }) +public class ReactiveClusterOperationsIntegrationTests { + + @Autowired private ReactiveElasticsearchOperations operations; + private ReactiveClusterOperations clusterOperations; + + @BeforeEach + void setUp() { + clusterOperations = operations.cluster(); + } + + @Test // #1390 + @DisplayName("should return cluster health information") + void shouldReturnClusterHealthInformation() { + + List allowedStates = Arrays.asList("GREEN", "YELLOW"); + + clusterOperations.health() // + .as(StepVerifier::create) // + .consumeNextWith(clusterHealth -> { // + assertThat(allowedStates).contains(clusterHealth.getStatus()); // + }) // + .verifyComplete(); + } +}