From e44b7cb6793c7b0553c4036fca5291b52b24b6bc Mon Sep 17 00:00:00 2001 From: Sijia Liu Date: Tue, 18 Jan 2022 18:28:22 +0800 Subject: [PATCH 1/8] Implementing - Native query with ext section for SearchPlugin --- .../elasticsearch/core/RequestFactory.java | 5 +++++ .../core/query/NativeSearchQuery.java | 19 +++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index d7f340d41..0251bfeab 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -111,6 +111,7 @@ * @author Subhobrata Dey * @author Farid Faoudi * @author Peer Mueller + * @author Sijia Liu * @since 4.0 */ class RequestFactory { @@ -772,6 +773,10 @@ private void prepareNativeSearch(NativeSearchQuery query, SearchSourceBuilder so if (query.getSuggestBuilder() != null) { sourceBuilder.suggest(query.getSuggestBuilder()); } + + if (!isEmpty(query.getSearchExtBuilders())) { + sourceBuilder.ext(query.getSearchExtBuilders()); + } } @SuppressWarnings("rawtypes") diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/NativeSearchQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/NativeSearchQuery.java index 593fb6c5e..5104f2bd8 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/NativeSearchQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/NativeSearchQuery.java @@ -21,6 +21,7 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.mustache.SearchTemplateRequestBuilder; +import org.elasticsearch.search.SearchExtBuilder; import org.elasticsearch.search.aggregations.AbstractAggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.collapse.CollapseBuilder; @@ -41,6 +42,7 @@ * @author Jean-Baptiste Nizet * @author Martin Choraine * @author Peter-Josef Meisch + * @author Sijia Liu */ public class NativeSearchQuery extends BaseQuery { @@ -56,6 +58,7 @@ public class NativeSearchQuery extends BaseQuery { @Nullable private List indicesBoost; @Nullable private SearchTemplateRequestBuilder searchTemplate; @Nullable private SuggestBuilder suggestBuilder; + @Nullable private List searchExtBuilders; public NativeSearchQuery(@Nullable QueryBuilder query) { @@ -201,4 +204,20 @@ public void setSuggestBuilder(SuggestBuilder suggestBuilder) { public SuggestBuilder getSuggestBuilder() { return suggestBuilder; } + + public void setSearchExtBuilders(List searchExtBuilders) { + this.searchExtBuilders = searchExtBuilders; + } + + public void addSearchExtBuilder(SearchExtBuilder searchExtBuilder) { + if (searchExtBuilders == null) { + searchExtBuilders = new ArrayList<>(); + } + searchExtBuilders.add(searchExtBuilder); + } + + @Nullable + public List getSearchExtBuilders() { + return searchExtBuilders; + } } From 84eac20d037f3398aca0281d69f8ed19e038ea15 Mon Sep 17 00:00:00 2001 From: Sijia Liu Date: Fri, 21 Jan 2022 15:39:39 +0800 Subject: [PATCH 2/8] Implement support for reindex API [DATAES-955] #1529 --- .../DefaultReactiveElasticsearchClient.java | 16 + .../reactive/ReactiveElasticsearchClient.java | 65 +++ .../client/reactive/RequestCreator.java | 5 + .../client/util/RequestConverters.java | 11 +- .../core/DocumentOperations.java | 23 ++ .../core/ElasticsearchRestTemplate.java | 24 ++ .../core/ReactiveDocumentOperations.java | 23 ++ .../core/ReactiveElasticsearchTemplate.java | 26 ++ .../elasticsearch/core/RequestFactory.java | 132 +++++- .../elasticsearch/core/ResponseConverter.java | 44 ++ .../index/reindex/PostReindexRequest.java | 371 +++++++++++++++++ .../index/reindex/PostReindexResponse.java | 387 ++++++++++++++++++ .../core/index/reindex/Remote.java | 126 ++++++ .../core/ElasticsearchTemplateTests.java | 31 ++ ...ElasticsearchTemplateIntegrationTests.java | 36 ++ .../core/RequestFactoryTests.java | 73 +++- 16 files changed, 1373 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 001e44b1c..9ace9a169 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -83,11 +83,13 @@ import org.elasticsearch.client.GetAliasesResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.indices.*; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -143,6 +145,7 @@ * @author Brian Clozel * @author Farid Faoudi * @author George Popides + * @author Sijia Liu * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -509,6 +512,19 @@ public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { .next(); } + @Override + public Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers) + .next(); + } + + @Override + public Mono submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers) + .next() + .map(TaskSubmissionResponse::getTask); + } + @Override public Mono execute(ReactiveElasticsearchClientCallback callback) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 030243851..7b5ca47dd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; @@ -76,6 +77,7 @@ * @author Henrique Amaral * @author Thomas Geese * @author Farid Faoudi + * @author Sijia Liu * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -713,6 +715,69 @@ default Mono bulk(BulkRequest bulkRequest) { */ Mono bulk(HttpHeaders headers, BulkRequest bulkRequest); + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param consumer must not be {@literal null} + * @return the {@link Mono} emitting the response + */ + default Mono reindex(Consumer consumer){ + + ReindexRequest reindexRequest = new ReindexRequest(); + consumer.accept(reindexRequest); + return reindex(reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the response + */ + default Mono reindex(ReindexRequest reindexRequest){ + return reindex(HttpHeaders.EMPTY, reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the response + */ + Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest); + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param consumer must not be {@literal null} + * @return the {@link Mono} emitting the task + */ + default Mono submitReindexTask(Consumer consumer){ + + ReindexRequest reindexRequest = new ReindexRequest(); + consumer.accept(reindexRequest); + return submitReindexTask(reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the task + */ + default Mono submitReindexTask(ReindexRequest reindexRequest){ + return submitReindexTask(HttpHeaders.EMPTY, reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the task + */ + Mono submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest); /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 34fb05016..0dd71c5b4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -49,6 +49,7 @@ import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; @@ -289,4 +290,8 @@ default Function getIndex() { default Function clusterHealth() { return RequestConverters::clusterHealth; } + + default Function reindex() { return RequestConverters::reindex; } + + default Function submitReindex() { return RequestConverters::submitReindex; } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 6d5ad016d..82e9df984 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) { return request; } - public static Request reindex(ReindexRequest reindexRequest) throws IOException { + public static Request reindex(ReindexRequest reindexRequest) { return prepareReindexRequest(reindexRequest, true); } - static Request submitReindex(ReindexRequest reindexRequest) throws IOException { + public static Request submitReindex(ReindexRequest reindexRequest) { return prepareReindexRequest(reindexRequest, false); } @@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool .withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) .withRequestsPerSecond(reindexRequest.getRequestsPerSecond()); + if(reindexRequest.getDestination().isRequireAlias()){ + params.putParam("require_alias", Boolean.TRUE.toString()); + } if (reindexRequest.getScrollTime() != null) { params.putParam("scroll", reindexRequest.getScrollTime()); } + params.putParam("slices", Integer.toString(reindexRequest.getSlices())); + if(reindexRequest.getMaxDocs() > -1){ + params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs())); + } request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index d795462c1..772989e2d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,6 +18,8 @@ import java.util.Collection; import java.util.List; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -34,6 +36,7 @@ * * @author Peter-Josef Meisch * @author Farid Faoudi + * @author Sijia Liu * @since 4.0 */ public interface DocumentOperations { @@ -322,4 +325,24 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * @since 4.2 */ ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Copies documents from a source to a destination. + * The source can be any existing index, alias, or data stream. The destination must differ from the source. + * For example, you cannot reindex a data stream into itself. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return the reindex response + */ + PostReindexResponse reindex(PostReindexRequest postReindexRequest); + + /** + * Submits a reindex task. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return the task + */ + String submitReindexTask(PostReindexRequest postReindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 84277ce34..c2874847c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -61,7 +62,9 @@ import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -106,6 +109,7 @@ * @author Gyula Attila Csorogi * @author Massimiliano Poggi * @author Farid Faoudi + * @author Sijia Liu * @since 4.4 */ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @@ -277,6 +281,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) return ResponseConverter.byQueryResponseOf(bulkByScrollResponse); } + @Override + public PostReindexResponse reindex(PostReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final BulkByScrollResponse bulkByScrollResponse = execute( + client -> client.reindex(reindexRequest, RequestOptions.DEFAULT)); + return ResponseConverter.postReindexResponseOf(bulkByScrollResponse); + } + + @Override + public String submitReindexTask(PostReindexRequest postReindexRequest) { + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return execute( + client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask()); + } + public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 995a82897..f04b7dbb2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,6 +40,7 @@ * @author Aleksei Arsenev * @author Roman Puchkovskiy * @author Farid Faoudi + * @author Sijia Liu * @since 4.0 */ public interface ReactiveDocumentOperations { @@ -302,4 +305,24 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @since 4.2 */ Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Copies documents from a source to a destination. + * The source can be any existing index, alias, or data stream. The destination must differ from the source. + * For example, you cannot reindex a data stream into itself. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return a {@link Mono} emitting the reindex response + */ + Mono reindex(PostReindexRequest postReindexRequest); + + /** + * Submits a reindex task. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return a {@link Mono} emitting the {@literal task}. + */ + Mono submitReindexTask(PostReindexRequest postReindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 387f6d3fa..2ac177fa2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.reactivestreams.Publisher; @@ -70,6 +71,8 @@ import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -105,6 +108,7 @@ * @author Russell Parry * @author Thomas Geese * @author Farid Faoudi + * @author Sijia Liu * @since 3.2 */ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware { @@ -609,6 +613,28 @@ public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordin }); } + @Override + public Mono reindex(PostReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + return Mono.defer(() -> { + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::postReindexResponseOf); + }); + } + + @Override + public Mono submitReindexTask(PostReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + return Mono.defer(() -> { + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.submitReindexTask(reindexRequest))); + }); + } + @Override public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 0251bfeab..948c73e5c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -15,19 +15,15 @@ */ package org.springframework.data.elasticsearch.core; +import static org.elasticsearch.core.TimeValue.*; import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.elasticsearch.index.reindex.RemoteInfo.*; +import static org.elasticsearch.script.Script.*; import static org.springframework.util.CollectionUtils.*; +import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.elasticsearch.action.DocWriteRequest; @@ -58,6 +54,7 @@ import org.elasticsearch.client.indices.IndexTemplatesExistRequest; import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoDistance; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.core.TimeValue; @@ -66,6 +63,8 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -73,6 +72,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.rescore.QueryRescoreMode; import org.elasticsearch.search.rescore.QueryRescorerBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder; @@ -81,6 +81,8 @@ import org.elasticsearch.search.sort.SortMode; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; @@ -91,7 +93,12 @@ import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Source; +import org.springframework.data.elasticsearch.core.index.reindex.Remote; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Dest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Slice; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -387,6 +394,113 @@ public DeleteIndexTemplateRequest deleteIndexTemplateRequest(DeleteTemplateReque return new DeleteIndexTemplateRequest(deleteTemplateRequest.getTemplateName()); } + public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ + final ReindexRequest reindexRequest = new ReindexRequest(); + if(postReindexRequest.getConflicts() != null){ + reindexRequest.setConflicts(postReindexRequest.getConflicts()); + } + if(postReindexRequest.getMaxDocs() != null){ + reindexRequest.setMaxDocs(postReindexRequest.getMaxDocs()); + } + // region source build + final Source source = postReindexRequest.getSource(); + reindexRequest.setSourceIndices(source.getIndexes().toArray(new String[]{})); + if(source.getQuery() != null && source.getRemote() != null){ + reindexRequest.setSourceQuery(source.getQuery()); + } + if(source.getSize() != null){ + reindexRequest.setSourceBatchSize(source.getSize()); + } + + if(source.getRemote() != null){ + Remote remote = source.getRemote(); + QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : source.getQuery(); + BytesReference query = null; + try { + XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint(); + query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (IOException e) { + // ignore it + } + reindexRequest.setRemoteInfo(new RemoteInfo( + remote.getScheme(), + remote.getHost(), + remote.getPort(), + remote.getPathPrefix(), + Objects.requireNonNull(query), + remote.getUsername(), + remote.getPassword(), + Collections.emptyMap(), + remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT : timeValueSeconds(remote.getSocketTimeout().getSeconds()), + remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT : timeValueSeconds(remote.getConnectTimeout().getSeconds()) + )); + } + + final Slice slice = source.getSlice(); + if(slice != null){ + reindexRequest.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); + } + final SourceFilter sourceFilter = source.getSourceFilter(); + if(sourceFilter != null){ + reindexRequest.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); + } + // endregion + + // region dest build + final Dest dest = postReindexRequest.getDest(); + reindexRequest.setDestIndex(dest.getIndex()) + .setDestRouting(dest.getRouting()) + .setDestPipeline(dest.getPipeline()); + + final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType(); + if(versionType != null){ + reindexRequest.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); + } + final IndexQuery.OpType opType = dest.getOpType(); + if(opType != null){ + reindexRequest.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); + } + // endregion + + // region script build + final PostReindexRequest.Script script = postReindexRequest.getScript(); + if(script != null){ + reindexRequest.setScript(new Script(DEFAULT_SCRIPT_TYPE, + script.getLang(), + script.getSource(), + Collections.emptyMap() + )); + } + // endregion + + // region query parameters build + final Duration timeout = postReindexRequest.getTimeout(); + if(timeout != null){ + reindexRequest.setTimeout(timeValueSeconds(timeout.getSeconds())); + } + if(postReindexRequest.getRefresh() != null){ + reindexRequest.setRefresh(postReindexRequest.getRefresh()); + } + if(postReindexRequest.getRequireAlias() != null){ + reindexRequest.setRequireAlias(postReindexRequest.getRequireAlias()); + } + if(postReindexRequest.getRequestsPerSecond() != null){ + reindexRequest.setRequestsPerSecond(postReindexRequest.getRequestsPerSecond()); + } + final Duration scroll = postReindexRequest.getScroll(); + if(scroll != null){ + reindexRequest.setScroll(timeValueSeconds(scroll.getSeconds())); + } + if(postReindexRequest.getWaitForActiveShards() != null){ + reindexRequest.setWaitForActiveShards(ActiveShardCount.parseString(postReindexRequest.getWaitForActiveShards())); + } + if(postReindexRequest.getSlices() != null){ + reindexRequest.setSlices(postReindexRequest.getSlices()); + } + // endregion + return reindexRequest; + } + // endregion // region delete diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index 8f9d7928e..e4be000b1 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -45,6 +45,7 @@ import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.Settings; import org.springframework.data.elasticsearch.core.index.TemplateData; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -54,6 +55,7 @@ * * @author George Popides * @author Peter-Josef Meisch + * @author Sijia Liu * @since 4.2 */ public class ResponseConverter { @@ -384,4 +386,46 @@ public static ByQueryResponse.SearchFailure byQueryResponseSearchFailureOf( } // endregion + + // region postReindexResponse + + public static PostReindexResponse postReindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ + final List failures = bulkByScrollResponse.getBulkFailures() // + .stream() // + .map(ResponseConverter::postReindexResponseFailureOf) // + .collect(Collectors.toList()); // + + return PostReindexResponse.builder() // + .withTook(bulkByScrollResponse.getTook().getMillis()) // + .withTimedOut(bulkByScrollResponse.isTimedOut()) // + .withTotal(bulkByScrollResponse.getTotal()) // + .withUpdated(bulkByScrollResponse.getUpdated()) // + .withDeleted(bulkByScrollResponse.getDeleted()) // + .withBatches(bulkByScrollResponse.getBatches()) // + .withVersionConflicts(bulkByScrollResponse.getVersionConflicts()) // + .withNoops(bulkByScrollResponse.getNoops()) // + .withBulkRetries(bulkByScrollResponse.getBulkRetries()) // + .withSearchRetries(bulkByScrollResponse.getSearchRetries()) // + .withThrottledMillis(bulkByScrollResponse.getStatus().getThrottled().getMillis()) // + .withRequestsPerSecond(bulkByScrollResponse.getStatus().getRequestsPerSecond()) // + .withThrottledUntilMillis(bulkByScrollResponse.getStatus().getThrottledUntil().getMillis()) // + .withFailures(failures) // + .build(); // + + } + + public static PostReindexResponse.Failure postReindexResponseFailureOf(BulkItemResponse.Failure failure) { + return PostReindexResponse.Failure.builder() // + .withIndex(failure.getIndex()) // + .withType(failure.getType()) // + .withId(failure.getId()) // + .withStatus(failure.getStatus().getStatus()) // + .withAborted(failure.isAborted()) // + .withCause(failure.getCause()) // + .withSeqNo(failure.getSeqNo()) // + .withTerm(failure.getTerm()) // + .build(); // + } + + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java new file mode 100644 index 000000000..64e26e4d3 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java @@ -0,0 +1,371 @@ +package org.springframework.data.elasticsearch.core.index.reindex; + +import org.elasticsearch.index.query.QueryBuilder; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.SourceFilter; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.springframework.util.CollectionUtils.*; + +/** + * Request to reindex some documents from one index to another. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @author Sijia Liu + */ +public class PostReindexRequest { + + // Request body + private final Source source; + private final Dest dest; + @Nullable private final Integer maxDocs; + @Nullable private final String conflicts; + @Nullable private final Script script; + + // Query parameters + @Nullable private final Duration timeout; + @Nullable private final Boolean requireAlias; + @Nullable private final Boolean refresh; + @Nullable private final String waitForActiveShards; + @Nullable private final Integer requestsPerSecond; + @Nullable private final Duration scroll; + @Nullable private final Integer slices; + + PostReindexRequest(@Nullable Integer maxDocs, @Nullable String conflicts, Source source, Dest dest, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { + this.maxDocs = maxDocs; + this.conflicts = conflicts; + this.source = source; + this.dest = dest; + this.script = script; + this.timeout = timeout; + this.requireAlias = requireAlias; + this.refresh = refresh; + this.waitForActiveShards = waitForActiveShards; + this.requestsPerSecond = requestsPerSecond; + this.scroll = scroll; + this.slices = slices; + } + + @Nullable + public Integer getMaxDocs() { + return maxDocs; + } + + public Source getSource() { + return source; + } + + public Dest getDest() { + return dest; + } + + @Nullable + public Script getScript() { + return script; + } + + @Nullable + public String getConflicts() { + return conflicts; + } + + @Nullable + public Boolean getRequireAlias() { + return requireAlias; + } + + @Nullable + public Duration getTimeout() { + return timeout; + } + + @Nullable + public Boolean getRefresh() { + return refresh; + } + + @Nullable + public String getWaitForActiveShards() { + return waitForActiveShards; + } + + @Nullable + public Integer getRequestsPerSecond() { + return requestsPerSecond; + } + + @Nullable + public Duration getScroll() { + return scroll; + } + + @Nullable + public Integer getSlices() { + return slices; + } + + public static PostReindexRequestBuilder builder(String sourceIndex, String destIndex) { + return new PostReindexRequestBuilder(sourceIndex, destIndex); + } + + public static class Source { + private final List indexes = new ArrayList<>(); + @Nullable private QueryBuilder query; + @Nullable private Remote remote; + @Nullable private Slice slice; + @Nullable private Integer size; + @Nullable private SourceFilter sourceFilter; + + public List getIndexes() { + return indexes; + } + + @Nullable + public Remote getRemote() { + return remote; + } + + @Nullable + public QueryBuilder getQuery() { + return query; + } + + @Nullable + public Integer getSize() { + return size; + } + + @Nullable + public Slice getSlice() { + return slice; + } + + @Nullable + public SourceFilter getSourceFilter() { + return sourceFilter; + } + } + + public static class Slice { + private final Integer id; + private final Integer max; + + Slice(Integer id, Integer max) { + this.id = id; + this.max = max; + } + + public Integer getId() { + return id; + } + + public Integer getMax() { + return max; + } + } + + public static class Dest { + + private final String index; + @Nullable private String pipeline; + @Nullable private String routing; + @Nullable private Document.VersionType versionType; + @Nullable private IndexQuery.OpType opType; + + Dest(String index) { + Assert.notNull(index, "dest index must not be null"); + this.index = index; + } + + public String getIndex() { + return index; + } + + @Nullable + public Document.VersionType getVersionType() { + return versionType; + } + + @Nullable + public IndexQuery.OpType getOpType() { + return opType; + } + + @Nullable + public String getPipeline() { + return pipeline; + } + + @Nullable + public String getRouting() { + return routing; + } + } + + public static class Script { + private final String source; + private final String lang; + + Script(String source, String lang) { + this.source = source; + this.lang = lang; + } + + public String getSource() { + return source; + } + + public String getLang() { + return lang; + } + } + + public static final class PostReindexRequestBuilder { + + private final Source source; + private final Dest dest; + @Nullable private Integer maxDocs; + @Nullable private String conflicts; + @Nullable private Script script; + @Nullable private Duration timeout; + @Nullable private Boolean requireAlias; + @Nullable private Boolean refresh; + @Nullable private String waitForActiveShards; + @Nullable private Integer requestsPerSecond; + @Nullable private Duration scroll; + @Nullable private Integer slices; + + public PostReindexRequestBuilder(String sourceIndex, String destIndex) { + this.source = new Source(); + this.source.indexes.add(sourceIndex); + this.dest = new Dest(destIndex); + } + + // region setter + public PostReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { + this.maxDocs = maxDocs; + return this; + } + + public PostReindexRequestBuilder withConflicts(String conflicts) { + this.conflicts = conflicts; + return this; + } + + public PostReindexRequestBuilder addSourceIndex(String sourceIndex) { + Assert.notNull(sourceIndex, "source index must not be null"); + this.source.indexes.add(sourceIndex); + return this; + } + + public PostReindexRequestBuilder withSourceIndexes(List sourceIndexes) { + if (!isEmpty(sourceIndexes)) { + clearSourceIndexes(); + this.source.indexes.addAll(sourceIndexes); + } + return this; + } + + public PostReindexRequestBuilder clearSourceIndexes() { + this.source.indexes.clear(); + return this; + } + + public PostReindexRequestBuilder withSourceQuery(QueryBuilder query) { + this.source.query = query; + return this; + } + + public PostReindexRequestBuilder withSourceSlice(int id, int max){ + this.source.slice = new Slice(id, max); + return this; + } + + public PostReindexRequestBuilder withSourceRemote(Remote remote) { + this.source.remote = remote; + return this; + } + + public PostReindexRequestBuilder withSourceSize(int size) { + this.source.size = size; + return this; + } + + public PostReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ + this.source.sourceFilter = sourceFilter; + return this; + } + + public PostReindexRequestBuilder withDestPipeline(String pipelineName){ + this.dest.pipeline = pipelineName; + return this; + } + + public PostReindexRequestBuilder withDestRouting(String routing){ + this.dest.routing = routing; + return this; + } + + public PostReindexRequestBuilder withDestVersionType(Document.VersionType versionType) { + this.dest.versionType = versionType; + return this; + } + + public PostReindexRequestBuilder withDestOpType(IndexQuery.OpType opType) { + this.dest.opType = opType; + return this; + } + + + public PostReindexRequestBuilder withScript(String source, String lang) { + this.script = new Script(source, lang); + return this; + } + + public PostReindexRequestBuilder withTimeout(Duration timeout){ + this.timeout = timeout; + return this; + } + + public PostReindexRequestBuilder withRequireAlias(boolean requireAlias){ + this.requireAlias = requireAlias; + return this; + } + + public PostReindexRequestBuilder withRefresh(boolean refresh){ + this.refresh = refresh; + return this; + } + + public PostReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ + this.waitForActiveShards = waitForActiveShards; + return this; + } + + public PostReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public PostReindexRequestBuilder withScroll(Duration scroll){ + this.scroll = scroll; + return this; + } + + public PostReindexRequestBuilder withSlices(int slices){ + this.slices = slices; + return this; + } + // endregion + + public PostReindexRequest build() { + return new PostReindexRequest(maxDocs, conflicts, source, dest, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java new file mode 100644 index 000000000..820e7588c --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java @@ -0,0 +1,387 @@ +package org.springframework.data.elasticsearch.core.index.reindex; + +import org.springframework.lang.Nullable; + +import java.util.Collections; +import java.util.List; + +/** + * Response of reindex request. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) + * + * @author Onizuka + */ +public class PostReindexResponse { + + private final long took; + private final boolean timedOut; + private final long total; + private final long updated; + private final long deleted; + private final int batches; + private final long versionConflicts; + private final long noops; + private final long bulkRetries; + private final long searchRetries; + private final long throttledMillis; + private final double requestsPerSecond; + private final long throttledUntilMillis; + private final List failures; + + private PostReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, + long versionConflicts, long noops, long bulkRetries, long searchRetries, + long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List failures) { + this.took = took; + this.timedOut = timedOut; + this.total = total; + this.updated = updated; + this.deleted = deleted; + this.batches = batches; + this.versionConflicts = versionConflicts; + this.noops = noops; + this.bulkRetries = bulkRetries; + this.searchRetries = searchRetries; + this.throttledMillis = throttledMillis; + this.requestsPerSecond = requestsPerSecond; + this.throttledUntilMillis = throttledUntilMillis; + this.failures = failures; + } + + /** + * The number of milliseconds from start to end of the whole operation. + */ + public long getTook() { + return took; + } + + /** + * Did any of the sub-requests that were part of this request timeout? + */ + public boolean isTimedOut() { + return timedOut; + } + + /** + * The number of documents that were successfully processed. + */ + public long getTotal() { + return total; + } + + /** + * The number of documents that were successfully updated. + */ + public long getUpdated() { + return updated; + } + + /** + * The number of documents that were successfully deleted. + */ + public long getDeleted() { + return deleted; + } + + /** + * The number of scroll responses pulled back by the update by query. + */ + public int getBatches() { + return batches; + } + + /** + * The number of version conflicts that the update by query hit. + */ + public long getVersionConflicts() { + return versionConflicts; + } + + /** + * The number of documents that were ignored because the script used for the update by query returned a noop value for + * ctx.op. + */ + public long getNoops() { + return noops; + } + + /** + * The number of times that the request had retry bulk actions. + */ + public long getBulkRetries() { + return bulkRetries; + } + + /** + * The number of times that the request had retry search actions. + */ + public long getSearchRetries() { + return searchRetries; + } + + /** + * Number of milliseconds the request slept to conform to requests_per_second. + */ + public long getThrottledMillis() { + return throttledMillis; + } + + /** + * The number of requests per second effectively executed during the reindex. + */ + public double getRequestsPerSecond() { + return requestsPerSecond; + } + + /** + * This field should always be equal to zero in a _reindex response. + * It only has meaning when using the Task API, where it indicates the next time (in milliseconds since epoch) + * a throttled request will be executed again in order to conform to requests_per_second. + */ + public long getThrottledUntilMillis() { + return throttledUntilMillis; + } + + /** + * All of the bulk failures. Version conflicts are only included if the request sets abortOnVersionConflict to true + * (the default). + */ + public List getFailures() { + return failures; + } + + /** + * Create a new {@link PostReindexResponse} to build {@link PostReindexResponse} + * + * @return a new {@link PostReindexResponse} to build {@link PostReindexResponse} + */ + public static PostReindexResponseBuilder builder() { + return new PostReindexResponseBuilder(); + } + + public static class Failure { + + @Nullable private final String index; + @Nullable private final String type; + @Nullable private final String id; + @Nullable private final Exception cause; + @Nullable private final Integer status; + @Nullable private final Long seqNo; + @Nullable private final Long term; + @Nullable private final Boolean aborted; + + private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause, + @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { + this.index = index; + this.type = type; + this.id = id; + this.cause = cause; + this.status = status; + this.seqNo = seqNo; + this.term = term; + this.aborted = aborted; + } + + @Nullable + public String getIndex() { + return index; + } + + @Nullable + public String getType() { + return type; + } + + @Nullable + public String getId() { + return id; + } + + @Nullable + public Exception getCause() { + return cause; + } + + @Nullable + public Integer getStatus() { + return status; + } + + @Nullable + public Long getSeqNo() { + return seqNo; + } + + @Nullable + public Long getTerm() { + return term; + } + + @Nullable + public Boolean getAborted() { + return aborted; + } + + /** + * Create a new {@link Failure.FailureBuilder} to build {@link Failure} + * + * @return a new {@link Failure.FailureBuilder} to build {@link Failure} + */ + public static Failure.FailureBuilder builder() { + return new Failure.FailureBuilder(); + } + + /** + * Builder for {@link Failure} + */ + public static final class FailureBuilder { + @Nullable private String index; + @Nullable private String type; + @Nullable private String id; + @Nullable private Exception cause; + @Nullable private Integer status; + @Nullable private Long seqNo; + @Nullable private Long term; + @Nullable private Boolean aborted; + + private FailureBuilder() {} + + public Failure.FailureBuilder withIndex(String index) { + this.index = index; + return this; + } + + public Failure.FailureBuilder withType(String type) { + this.type = type; + return this; + } + + public Failure.FailureBuilder withId(String id) { + this.id = id; + return this; + } + + public Failure.FailureBuilder withCause(Exception cause) { + this.cause = cause; + return this; + } + + public Failure.FailureBuilder withStatus(Integer status) { + this.status = status; + return this; + } + + public Failure.FailureBuilder withSeqNo(Long seqNo) { + this.seqNo = seqNo; + return this; + } + + public Failure.FailureBuilder withTerm(Long term) { + this.term = term; + return this; + } + + public Failure.FailureBuilder withAborted(Boolean aborted) { + this.aborted = aborted; + return this; + } + + public Failure build() { + return new Failure(index, type, id, cause, status, seqNo, term, aborted); + } + } + } + + public static final class PostReindexResponseBuilder { + private long took; + private boolean timedOut; + private long total; + private long updated; + private long deleted; + private int batches; + private long versionConflicts; + private long noops; + private long bulkRetries; + private long searchRetries; + private long throttledMillis; + private double requestsPerSecond; + private long throttledUntilMillis; + private List failures = Collections.emptyList(); + + private PostReindexResponseBuilder() {} + + public PostReindexResponseBuilder withTook(long took) { + this.took = took; + return this; + } + + public PostReindexResponseBuilder withTimedOut(boolean timedOut) { + this.timedOut = timedOut; + return this; + } + + public PostReindexResponseBuilder withTotal(long total) { + this.total = total; + return this; + } + + public PostReindexResponseBuilder withUpdated(long updated) { + this.updated = updated; + return this; + } + + public PostReindexResponseBuilder withDeleted(long deleted) { + this.deleted = deleted; + return this; + } + + public PostReindexResponseBuilder withBatches(int batches) { + this.batches = batches; + return this; + } + + public PostReindexResponseBuilder withVersionConflicts(long versionConflicts) { + this.versionConflicts = versionConflicts; + return this; + } + + public PostReindexResponseBuilder withNoops(long noops) { + this.noops = noops; + return this; + } + + public PostReindexResponseBuilder withBulkRetries(long bulkRetries) { + this.bulkRetries = bulkRetries; + return this; + } + + public PostReindexResponseBuilder withSearchRetries(long searchRetries) { + this.searchRetries = searchRetries; + return this; + } + + public PostReindexResponseBuilder withThrottledMillis(long throttledMillis){ + this.throttledMillis = throttledMillis; + return this; + } + + public PostReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public PostReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ + this.throttledUntilMillis = throttledUntilMillis; + return this; + } + + public PostReindexResponseBuilder withFailures(List failures) { + this.failures = failures; + return this; + } + + public PostReindexResponse build() { + return new PostReindexResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, throttledMillis, requestsPerSecond, throttledUntilMillis, failures); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java new file mode 100644 index 000000000..320d1042d --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java @@ -0,0 +1,126 @@ +package org.springframework.data.elasticsearch.core.index.reindex; + +import org.elasticsearch.index.reindex.RemoteInfo; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; + +/** + * Remote info {@link RemoteInfo} + * + * @author Onizuka + */ +public class Remote { + private final String scheme; + private final String host; + private final int port; + + @Nullable private final String pathPrefix; + @Nullable private final String username; + @Nullable private final String password; + @Nullable private final Duration socketTimeout; + @Nullable private final Duration connectTimeout; + + private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username, @Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) { + + Assert.notNull(scheme, "scheme must not be null"); + Assert.notNull(host, "host must not be null"); + + this.scheme = scheme; + this.host = host; + this.port = port; + this.pathPrefix = pathPrefix; + this.username = username; + this.password = password; + this.socketTimeout = socketTimeout; + this.connectTimeout = connectTimeout; + } + + public String getHost() { + return host; + } + + @Nullable + public String getUsername() { + return username; + } + + @Nullable + public String getPassword() { + return password; + } + + @Nullable + public Duration getSocketTimeout() { + return socketTimeout; + } + + @Nullable + public Duration getConnectTimeout() { + return connectTimeout; + } + + public String getScheme() { + return scheme; + } + + public int getPort() { + return port; + } + + @Nullable + public String getPathPrefix() { + return pathPrefix; + } + + public static RemoteBuilder builder(String scheme, String host, int port){ + return new RemoteBuilder(scheme, host, port); + } + + public static class RemoteBuilder{ + private final String scheme; + private final String host; + private final int port; + @Nullable private String pathPrefix; + @Nullable private String username; + @Nullable private String password; + @Nullable private Duration socketTimeout; + @Nullable private Duration connectTimeout; + + public RemoteBuilder(String scheme, String host, int port) { + this.scheme = scheme; + this.host = host; + this.port = port; + } + + public RemoteBuilder withPathPrefix(String pathPrefix){ + this.pathPrefix = pathPrefix; + return this; + } + + public RemoteBuilder withUsername(String username){ + this.username = username; + return this; + } + + public RemoteBuilder withPassword(String password){ + this.password = password; + return this; + } + + public RemoteBuilder withSocketTimeout(Duration socketTimeout){ + this.socketTimeout = socketTimeout; + return this; + } + + public RemoteBuilder withConnectTimeout(Duration connectTimeout){ + this.connectTimeout = connectTimeout; + return this; + } + + public Remote build(){ + return new Remote(scheme, host, port , pathPrefix, username, password, socketTimeout, connectTimeout); + } + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index d75218eff..82ebfd958 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -83,6 +83,8 @@ import org.springframework.data.elasticsearch.annotations.MultiField; import org.springframework.data.elasticsearch.annotations.ScriptedField; import org.springframework.data.elasticsearch.annotations.Setting; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.ScriptField; @@ -126,6 +128,7 @@ * @author Subhobrata Dey * @author Farid Faoudi * @author Peer Mueller + * @author Sijia Liu */ @SpringIntegrationTest public abstract class ElasticsearchTemplateTests { @@ -3646,6 +3649,34 @@ void shouldNotErrorOnSortWithUnmappedFieldAndUnmappedTypeSettings() { operations.search(query, SampleEntity.class); } + @Test // #1529 + void shouldWorkReindexForExistingIndex() { + String sourceIndexName = indexNameProvider.indexName(); + String documentId = nextIdAsString(); + SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("abc").build(); + operations.save(sampleEntity); + + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + + final PostReindexResponse reindex = operations.reindex(PostReindexRequest.builder(sourceIndexName, destIndexName).withRefresh(true).build()); + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + assertThat(reindex.getTotal()).isEqualTo(1); + assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); + } + + @Test // #1529 + void shouldWorkSubmitReindexTask(){ + String sourceIndexName = indexNameProvider.indexName(); + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + String task = operations.submitReindexTask(PostReindexRequest.builder(sourceIndexName, destIndexName).build()); + // Maybe there should be a task api to detect whether the task exists + assertThat(task).isNotBlank(); + } + // region entities @Document(indexName = "#{@indexNameProvider.indexName()}") @Setting(shards = 1, replicas = 0, refreshInterval = "-1") diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index 0b34f9fa6..e31a61595 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -19,7 +19,11 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; +import static org.springframework.data.elasticsearch.utils.IdGenerator.*; +import org.elasticsearch.common.Strings; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -1188,6 +1192,38 @@ void shouldReturnMonoOfReactiveSearchHits() { .verifyComplete(); } + @Test // #1529 + void shouldWorkReindexForExistingIndex() { + String sourceIndexName = indexNameProvider.indexName(); + String documentId = nextIdAsString(); + ElasticsearchTemplateTests.SampleEntity sampleEntity = ElasticsearchTemplateTests.SampleEntity.builder().id(documentId).message("abc").build(); + operations.save(sampleEntity).block(); + + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + operations.reindex(PostReindexRequest.builder(sourceIndexName, destIndexName).withRefresh(true).build()) + .as(StepVerifier::create) + .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) + .verifyComplete(); + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + operations.count(searchQuery, SampleEntity.class, IndexCoordinates.of(destIndexName)) + .as(StepVerifier::create) + .expectNext(1L) + .verifyComplete(); + } + + @Test // #1529 + void shouldWorkSubmitReindexTask(){ + String sourceIndexName = indexNameProvider.indexName(); + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + operations.submitReindexTask(PostReindexRequest.builder(sourceIndexName, destIndexName).build()) + .as(StepVerifier::create) + .consumeNextWith(task -> assertThat(task).isNotBlank()) + .verifyComplete(); + } // endregion // region Helper functions diff --git a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java index f513abc97..c093efe27 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder.FilterFunctionBuilder; import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentType; import org.json.JSONException; @@ -48,6 +49,7 @@ import org.springframework.data.annotation.Id; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.geo.GeoPoint; @@ -55,18 +57,12 @@ import org.springframework.data.elasticsearch.core.index.AliasActionParameters; import org.springframework.data.elasticsearch.core.index.AliasActions; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.Remote; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; -import org.springframework.data.elasticsearch.core.query.Criteria; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.GeoDistanceOrder; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; -import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.RescorerQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; -import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.lang.Nullable; /** @@ -74,6 +70,7 @@ * @author Roman Puchkovskiy * @author Peer Mueller * @author vdisk + * @author Sijia Liu */ @SuppressWarnings("ConstantConditions") @ExtendWith(MockitoExtension.class) @@ -562,6 +559,64 @@ void shouldSetStoredFieldsOnSearchRequest() { .isEqualTo(Arrays.asList("last-name", "current-location")); } + @Test // #1529 + void shouldCreatePostReindexRequest() throws IOException, JSONException { + final String expected = "{\n" + + " \"source\":{\n" + + " \"remote\":{\n" + + " \"username\":\"admin\",\n" + + " \"password\":\"admin\",\n" + + " \"host\":\"http://localhost:9200/elasticsearch\",\n" + + " \"socket_timeout\":\"30s\",\n" + + " \"connect_timeout\":\"30s\"\n" + + " },\n" + + " \"index\":[\"source_1\",\"source_2\"],\n" + + " \"size\":5,\n" + + " \"query\":{\"match_all\":{\"boost\":1.0}},\n" + + " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + + " \"slice\":{\"id\":1,\"max\":20}\n" + + " },\n" + + " \"dest\":{\n" + + " \"index\":\"destination\",\n" + + " \"routing\":\"routing\",\n" + + " \"op_type\":\"create\",\n" + + " \"pipeline\":\"pipeline\",\n" + + " \"version_type\":\"external\"\n" + + " },\n" + + " \"max_docs\":10,\n" + + " \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + + " \"conflicts\":\"proceed\"\n" + + "}"; + + Remote remote = Remote.builder("http", "localhost",9200) + .withPathPrefix("elasticsearch") + .withUsername("admin") + .withPassword("admin") + .withConnectTimeout(Duration.ofSeconds(30)) + .withSocketTimeout(Duration.ofSeconds(30)).build(); + + PostReindexRequest postReindexRequest = PostReindexRequest.builder("source_1", "destination") + .addSourceIndex("source_2") + .withConflicts("proceed") + .withMaxDocs(10) + .withSourceQuery(QueryBuilders.matchAllQuery()) + .withSourceSize(5) + .withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()) + .withSourceRemote(remote) + .withSourceSlice(1,20) + .withDestOpType(IndexQuery.OpType.CREATE) + .withDestVersionType(Document.VersionType.EXTERNAL) + .withDestPipeline("pipeline") + .withDestRouting("routing") + .withScript("Math.max(1,2)", "java") + .build(); + + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final String json = requestToString(reindexRequest); + + assertEquals(expected, json, false); + } + // region entities static class Person { @Nullable From 4f7a679955990b9da60f595d3bb6d70a55d51f2f Mon Sep 17 00:00:00 2001 From: Sijia Liu Date: Fri, 21 Jan 2022 16:36:13 +0800 Subject: [PATCH 3/8] Use IndexCoordinates instead of String indexName --- .../elasticsearch/core/RequestFactory.java | 4 +- .../index/reindex/PostReindexRequest.java | 44 +++++-------------- .../index/reindex/PostReindexResponse.java | 2 +- .../core/index/reindex/Remote.java | 2 +- .../core/ElasticsearchTemplateTests.java | 8 +++- ...ElasticsearchTemplateIntegrationTests.java | 13 ++++-- .../core/RequestFactoryTests.java | 4 +- 7 files changed, 33 insertions(+), 44 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 948c73e5c..356979169 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -404,7 +404,7 @@ public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ } // region source build final Source source = postReindexRequest.getSource(); - reindexRequest.setSourceIndices(source.getIndexes().toArray(new String[]{})); + reindexRequest.setSourceIndices(source.getIndexes().getIndexNames()); if(source.getQuery() != null && source.getRemote() != null){ reindexRequest.setSourceQuery(source.getQuery()); } @@ -448,7 +448,7 @@ public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ // region dest build final Dest dest = postReindexRequest.getDest(); - reindexRequest.setDestIndex(dest.getIndex()) + reindexRequest.setDestIndex(dest.getIndex().getIndexName()) .setDestRouting(dest.getRouting()) .setDestPipeline(dest.getPipeline()); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java index 64e26e4d3..251b28fcb 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java @@ -2,16 +2,13 @@ import org.elasticsearch.index.query.QueryBuilder; import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.IndexQuery; import org.springframework.data.elasticsearch.core.query.SourceFilter; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import java.time.Duration; -import java.util.ArrayList; -import java.util.List; - -import static org.springframework.util.CollectionUtils.*; /** * Request to reindex some documents from one index to another. @@ -110,19 +107,22 @@ public Integer getSlices() { return slices; } - public static PostReindexRequestBuilder builder(String sourceIndex, String destIndex) { + public static PostReindexRequestBuilder builder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { return new PostReindexRequestBuilder(sourceIndex, destIndex); } public static class Source { - private final List indexes = new ArrayList<>(); + private final IndexCoordinates indexes; @Nullable private QueryBuilder query; @Nullable private Remote remote; @Nullable private Slice slice; @Nullable private Integer size; @Nullable private SourceFilter sourceFilter; - public List getIndexes() { + Source(IndexCoordinates indexes){ + this.indexes = indexes; + } + public IndexCoordinates getIndexes() { return indexes; } @@ -172,18 +172,18 @@ public Integer getMax() { public static class Dest { - private final String index; + private final IndexCoordinates index; @Nullable private String pipeline; @Nullable private String routing; @Nullable private Document.VersionType versionType; @Nullable private IndexQuery.OpType opType; - Dest(String index) { + Dest(IndexCoordinates index) { Assert.notNull(index, "dest index must not be null"); this.index = index; } - public String getIndex() { + public IndexCoordinates getIndex() { return index; } @@ -241,9 +241,8 @@ public static final class PostReindexRequestBuilder { @Nullable private Duration scroll; @Nullable private Integer slices; - public PostReindexRequestBuilder(String sourceIndex, String destIndex) { - this.source = new Source(); - this.source.indexes.add(sourceIndex); + public PostReindexRequestBuilder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + this.source = new Source(sourceIndex); this.dest = new Dest(destIndex); } @@ -258,25 +257,6 @@ public PostReindexRequestBuilder withConflicts(String conflicts) { return this; } - public PostReindexRequestBuilder addSourceIndex(String sourceIndex) { - Assert.notNull(sourceIndex, "source index must not be null"); - this.source.indexes.add(sourceIndex); - return this; - } - - public PostReindexRequestBuilder withSourceIndexes(List sourceIndexes) { - if (!isEmpty(sourceIndexes)) { - clearSourceIndexes(); - this.source.indexes.addAll(sourceIndexes); - } - return this; - } - - public PostReindexRequestBuilder clearSourceIndexes() { - this.source.indexes.clear(); - return this; - } - public PostReindexRequestBuilder withSourceQuery(QueryBuilder query) { this.source.query = query; return this; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java index 820e7588c..0ec336cca 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java @@ -9,7 +9,7 @@ * Response of reindex request. * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) * - * @author Onizuka + * @author Sijia Liu */ public class PostReindexResponse { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java index 320d1042d..bf8a2e231 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java @@ -9,7 +9,7 @@ /** * Remote info {@link RemoteInfo} * - * @author Onizuka + * @author Sijia Liu */ public class Remote { private final String scheme; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 82ebfd958..174aa1ea9 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -3660,7 +3660,9 @@ void shouldWorkReindexForExistingIndex() { String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - final PostReindexResponse reindex = operations.reindex(PostReindexRequest.builder(sourceIndexName, destIndexName).withRefresh(true).build()); + final PostReindexRequest postReindexRequest = PostReindexRequest.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + .withRefresh(true).build(); + final PostReindexResponse reindex = operations.reindex(postReindexRequest); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); assertThat(reindex.getTotal()).isEqualTo(1); assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); @@ -3672,7 +3674,9 @@ void shouldWorkSubmitReindexTask(){ indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - String task = operations.submitReindexTask(PostReindexRequest.builder(sourceIndexName, destIndexName).build()); + final PostReindexRequest postReindexRequest = PostReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build(); + String task = operations.submitReindexTask(postReindexRequest); // Maybe there should be a task api to detect whether the task exists assertThat(task).isNotBlank(); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index e31a61595..b126ef6be 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -21,9 +21,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.*; import static org.springframework.data.elasticsearch.utils.IdGenerator.*; -import org.elasticsearch.common.Strings; import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -1202,7 +1200,11 @@ void shouldWorkReindexForExistingIndex() { indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - operations.reindex(PostReindexRequest.builder(sourceIndexName, destIndexName).withRefresh(true).build()) + final PostReindexRequest postReindexRequest = PostReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + .withRefresh(true) + .build(); + operations.reindex(postReindexRequest) .as(StepVerifier::create) .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) .verifyComplete(); @@ -1219,7 +1221,10 @@ void shouldWorkSubmitReindexTask(){ indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - operations.submitReindexTask(PostReindexRequest.builder(sourceIndexName, destIndexName).build()) + final PostReindexRequest postReindexRequest = PostReindexRequest + .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + .build(); + operations.submitReindexTask(postReindexRequest) .as(StepVerifier::create) .consumeNextWith(task -> assertThat(task).isNotBlank()) .verifyComplete(); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java index c093efe27..f916ce599 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java @@ -595,8 +595,8 @@ void shouldCreatePostReindexRequest() throws IOException, JSONException { .withConnectTimeout(Duration.ofSeconds(30)) .withSocketTimeout(Duration.ofSeconds(30)).build(); - PostReindexRequest postReindexRequest = PostReindexRequest.builder("source_1", "destination") - .addSourceIndex("source_2") + PostReindexRequest postReindexRequest = PostReindexRequest.builder(IndexCoordinates.of("source_1", "source_2"), + IndexCoordinates.of("destination")) .withConflicts("proceed") .withMaxDocs(10) .withSourceQuery(QueryBuilders.matchAllQuery()) From 3744028a1d22b905fc075b074e4e603ef31491fa Mon Sep 17 00:00:00 2001 From: Onizuka Date: Mon, 24 Jan 2022 10:52:35 +0800 Subject: [PATCH 4/8] Update src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java rename help method Co-authored-by: Peter-Josef Meisch --- .../data/elasticsearch/core/ResponseConverter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index e4be000b1..3a1d0f2ca 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -389,7 +389,7 @@ public static ByQueryResponse.SearchFailure byQueryResponseSearchFailureOf( // region postReindexResponse - public static PostReindexResponse postReindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ + public static PostReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ final List failures = bulkByScrollResponse.getBulkFailures() // .stream() // .map(ResponseConverter::postReindexResponseFailureOf) // From d3243fc9fb5af21002c706ab4b93b3ca8d8067a7 Mon Sep 17 00:00:00 2001 From: Onizuka Date: Mon, 24 Jan 2022 10:53:31 +0800 Subject: [PATCH 5/8] Update src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java independent of the client implementation Co-authored-by: Peter-Josef Meisch --- .../core/ReactiveElasticsearchTemplateIntegrationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index b126ef6be..49d748bb4 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -1208,7 +1208,7 @@ void shouldWorkReindexForExistingIndex() { .as(StepVerifier::create) .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) .verifyComplete(); - NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + Query searchQuery = operations.matchAllQuery(); operations.count(searchQuery, SampleEntity.class, IndexCoordinates.of(destIndexName)) .as(StepVerifier::create) .expectNext(1L) From 77395502082e511e4b868efbd6231bcbfdda194c Mon Sep 17 00:00:00 2001 From: Sijia Liu Date: Mon, 24 Jan 2022 11:17:26 +0800 Subject: [PATCH 6/8] move to upper level package --- .../ReindexRequest.java} | 47 ++++++++++++++----- .../ReindexResponse.java} | 17 ++++++- .../core/{index => }/reindex/Remote.java | 0 .../core/reindex/package-info.java | 0 4 files changed, 51 insertions(+), 13 deletions(-) rename src/main/java/org/springframework/data/elasticsearch/core/{index/reindex/PostReindexRequest.java => reindex/ReindexRequest.java} (83%) rename src/main/java/org/springframework/data/elasticsearch/core/{index/reindex/PostReindexResponse.java => reindex/ReindexResponse.java} (93%) rename src/main/java/org/springframework/data/elasticsearch/core/{index => }/reindex/Remote.java (100%) create mode 100644 src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java similarity index 83% rename from src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java rename to src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java index 251b28fcb..71c8b9bfe 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java @@ -1,4 +1,19 @@ -package org.springframework.data.elasticsearch.core.index.reindex; +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.reindex; import org.elasticsearch.index.query.QueryBuilder; import org.springframework.data.elasticsearch.annotations.Document; @@ -34,11 +49,15 @@ public class PostReindexRequest { @Nullable private final Duration scroll; @Nullable private final Integer slices; - PostReindexRequest(@Nullable Integer maxDocs, @Nullable String conflicts, Source source, Dest dest, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { - this.maxDocs = maxDocs; - this.conflicts = conflicts; + private PostReindexRequest(Source source, Dest dest,@Nullable Integer maxDocs, @Nullable String conflicts, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { + + Assert.notNull(source, "source must not be null"); + Assert.notNull(dest, "dest must not be null"); + this.source = source; this.dest = dest; + this.maxDocs = maxDocs; + this.conflicts = conflicts; this.script = script; this.timeout = timeout; this.requireAlias = requireAlias; @@ -119,9 +138,12 @@ public static class Source { @Nullable private Integer size; @Nullable private SourceFilter sourceFilter; - Source(IndexCoordinates indexes){ + private Source(IndexCoordinates indexes){ + Assert.notNull(indexes, "indexes must not be null"); + this.indexes = indexes; } + public IndexCoordinates getIndexes() { return indexes; } @@ -153,19 +175,19 @@ public SourceFilter getSourceFilter() { } public static class Slice { - private final Integer id; - private final Integer max; + private final int id; + private final int max; - Slice(Integer id, Integer max) { + private Slice(int id, int max) { this.id = id; this.max = max; } - public Integer getId() { + public int getId() { return id; } - public Integer getMax() { + public int getMax() { return max; } } @@ -178,7 +200,7 @@ public static class Dest { @Nullable private Document.VersionType versionType; @Nullable private IndexQuery.OpType opType; - Dest(IndexCoordinates index) { + private Dest(IndexCoordinates index) { Assert.notNull(index, "dest index must not be null"); this.index = index; } @@ -242,6 +264,7 @@ public static final class PostReindexRequestBuilder { @Nullable private Integer slices; public PostReindexRequestBuilder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + this.source = new Source(sourceIndex); this.dest = new Dest(destIndex); } @@ -345,7 +368,7 @@ public PostReindexRequestBuilder withSlices(int slices){ // endregion public PostReindexRequest build() { - return new PostReindexRequest(maxDocs, conflicts, source, dest, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); + return new PostReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java similarity index 93% rename from src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java rename to src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java index 0ec336cca..138c8bb41 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java @@ -1,4 +1,19 @@ -package org.springframework.data.elasticsearch.core.index.reindex; +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.reindex; import org.springframework.lang.Nullable; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java similarity index 100% rename from src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java rename to src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java new file mode 100644 index 000000000..e69de29bb From addf096f7a0da5cd247b311a87c0600890998161 Mon Sep 17 00:00:00 2001 From: Sijia Liu Date: Mon, 24 Jan 2022 14:47:46 +0800 Subject: [PATCH 7/8] rename ReindexRequest and ReindexResponse --- .../DefaultReactiveElasticsearchClient.java | 2 +- .../reactive/ReactiveElasticsearchClient.java | 22 +++-- .../client/reactive/RequestCreator.java | 6 ++ .../core/DocumentOperations.java | 16 ++-- .../core/ElasticsearchRestTemplate.java | 15 ++-- .../core/ReactiveDocumentOperations.java | 16 ++-- .../core/ReactiveElasticsearchTemplate.java | 17 ++-- .../elasticsearch/core/ResponseConverter.java | 20 +++-- .../core/reindex/ReindexRequest.java | 86 +++++++++++-------- .../core/reindex/ReindexResponse.java | 53 ++++++------ .../elasticsearch/core/reindex/Remote.java | 22 ++++- .../core/reindex/package-info.java | 3 + .../core/ElasticsearchTemplateTests.java | 12 +-- ...ElasticsearchTemplateIntegrationTests.java | 17 ++-- 14 files changed, 179 insertions(+), 128 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 9ace9a169..cd5d2beb3 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -519,7 +519,7 @@ public Mono reindex(HttpHeaders headers, ReindexRequest re } @Override - public Mono submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest) { + public Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) { return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers) .next() .map(TaskSubmissionResponse::getTask); diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 7b5ca47dd..b5edb1904 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -720,6 +720,7 @@ default Mono bulk(BulkRequest bulkRequest) { * * @param consumer must not be {@literal null} * @return the {@link Mono} emitting the response + * @since 4.4 */ default Mono reindex(Consumer consumer){ @@ -733,6 +734,7 @@ default Mono reindex(Consumer consumer){ * * @param reindexRequest must not be {@literal null} * @return the {@link Mono} emitting the response + * @since 4.4 */ default Mono reindex(ReindexRequest reindexRequest){ return reindex(HttpHeaders.EMPTY, reindexRequest); @@ -744,6 +746,7 @@ default Mono reindex(ReindexRequest reindexRequest){ * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param reindexRequest must not be {@literal null} * @return the {@link Mono} emitting the response + * @since 4.4 */ Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest); @@ -751,23 +754,25 @@ default Mono reindex(ReindexRequest reindexRequest){ * Execute the given {@link ReindexRequest} against the {@literal reindex} API. * * @param consumer must not be {@literal null} - * @return the {@link Mono} emitting the task + * @return the {@link Mono} emitting the task id + * @since 4.4 */ - default Mono submitReindexTask(Consumer consumer){ + default Mono submitReindex(Consumer consumer){ ReindexRequest reindexRequest = new ReindexRequest(); consumer.accept(reindexRequest); - return submitReindexTask(reindexRequest); + return submitReindex(reindexRequest); } /** * Execute the given {@link ReindexRequest} against the {@literal reindex} API. * * @param reindexRequest must not be {@literal null} - * @return the {@link Mono} emitting the task + * @return the {@link Mono} emitting the task id + * @since 4.4 */ - default Mono submitReindexTask(ReindexRequest reindexRequest){ - return submitReindexTask(HttpHeaders.EMPTY, reindexRequest); + default Mono submitReindex(ReindexRequest reindexRequest){ + return submitReindex(HttpHeaders.EMPTY, reindexRequest); } /** @@ -775,9 +780,10 @@ default Mono submitReindexTask(ReindexRequest reindexRequest){ * * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. * @param reindexRequest must not be {@literal null} - * @return the {@link Mono} emitting the task + * @return the {@link Mono} emitting the task id + * @since 4.4 */ - Mono submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest); + Mono submitReindex(HttpHeaders headers, ReindexRequest reindexRequest); /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 0dd71c5b4..0d1b5ecec 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -291,7 +291,13 @@ default Function clusterHealth() { return RequestConverters::clusterHealth; } + /** + * @since 4.4 + */ default Function reindex() { return RequestConverters::reindex; } + /** + * @since 4.4 + */ default Function submitReindex() { return RequestConverters::submitReindex; } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index 772989e2d..fb66a4b99 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,8 +18,8 @@ import java.util.Collection; import java.util.List; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -332,17 +332,19 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * For example, you cannot reindex a data stream into itself. * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * - * @param postReindexRequest reindex request parameters + * @param reindexRequest reindex request parameters * @return the reindex response + * @since 4.4 */ - PostReindexResponse reindex(PostReindexRequest postReindexRequest); + ReindexResponse reindex(ReindexRequest reindexRequest); /** * Submits a reindex task. * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * - * @param postReindexRequest reindex request parameters - * @return the task + * @param reindexRequest reindex request parameters + * @return the task id + * @since 4.4 */ - String submitReindexTask(PostReindexRequest postReindexRequest); + String submitReindex(ReindexRequest reindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index c2874847c..b355e25bd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -53,7 +53,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -62,9 +61,9 @@ import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -282,21 +281,21 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) } @Override - public PostReindexResponse reindex(PostReindexRequest postReindexRequest) { + public ReindexResponse reindex(ReindexRequest postReindexRequest) { Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); - final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); final BulkByScrollResponse bulkByScrollResponse = execute( client -> client.reindex(reindexRequest, RequestOptions.DEFAULT)); - return ResponseConverter.postReindexResponseOf(bulkByScrollResponse); + return ResponseConverter.reindexResponseOf(bulkByScrollResponse); } @Override - public String submitReindexTask(PostReindexRequest postReindexRequest) { + public String submitReindex(ReindexRequest postReindexRequest) { Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); - final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); return execute( client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask()); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index f04b7dbb2..59a2e5675 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,8 +15,8 @@ */ package org.springframework.data.elasticsearch.core; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -312,17 +312,19 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * For example, you cannot reindex a data stream into itself. * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * - * @param postReindexRequest reindex request parameters + * @param reindexRequest reindex request parameters * @return a {@link Mono} emitting the reindex response + * @since 4.4 */ - Mono reindex(PostReindexRequest postReindexRequest); + Mono reindex(ReindexRequest reindexRequest); /** * Submits a reindex task. * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * - * @param postReindexRequest reindex request parameters - * @return a {@link Mono} emitting the {@literal task}. + * @param reindexRequest reindex request parameters + * @return a {@link Mono} emitting the {@literal task} id. + * @since 4.4 */ - Mono submitReindexTask(PostReindexRequest postReindexRequest); + Mono submitReindex(ReindexRequest reindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 2ac177fa2..d3bf9ad60 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -46,7 +46,6 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.reactivestreams.Publisher; @@ -71,8 +70,8 @@ import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -614,24 +613,24 @@ public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordin } @Override - public Mono reindex(PostReindexRequest postReindexRequest) { + public Mono reindex(ReindexRequest postReindexRequest) { Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); return Mono.defer(() -> { - final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); - return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::postReindexResponseOf); + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf); }); } @Override - public Mono submitReindexTask(PostReindexRequest postReindexRequest) { + public Mono submitReindex(ReindexRequest postReindexRequest) { Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); return Mono.defer(() -> { - final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); - return Mono.from(execute(client -> client.submitReindexTask(reindexRequest))); + final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.submitReindex(reindexRequest))); }); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index e4be000b1..8efe62224 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -45,7 +45,7 @@ import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.Settings; import org.springframework.data.elasticsearch.core.index.TemplateData; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -389,13 +389,16 @@ public static ByQueryResponse.SearchFailure byQueryResponseSearchFailureOf( // region postReindexResponse - public static PostReindexResponse postReindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ - final List failures = bulkByScrollResponse.getBulkFailures() // + /** + * @since 4.4 + */ + public static ReindexResponse reindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ + final List failures = bulkByScrollResponse.getBulkFailures() // .stream() // - .map(ResponseConverter::postReindexResponseFailureOf) // + .map(ResponseConverter::reindexResponseFailureOf) // .collect(Collectors.toList()); // - return PostReindexResponse.builder() // + return ReindexResponse.builder() // .withTook(bulkByScrollResponse.getTook().getMillis()) // .withTimedOut(bulkByScrollResponse.isTimedOut()) // .withTotal(bulkByScrollResponse.getTotal()) // @@ -414,8 +417,11 @@ public static PostReindexResponse postReindexResponseOf(BulkByScrollResponse bul } - public static PostReindexResponse.Failure postReindexResponseFailureOf(BulkItemResponse.Failure failure) { - return PostReindexResponse.Failure.builder() // + /** + * @since 4.4 + */ + public static ReindexResponse.Failure reindexResponseFailureOf(BulkItemResponse.Failure failure) { + return ReindexResponse.Failure.builder() // .withIndex(failure.getIndex()) // .withType(failure.getType()) // .withId(failure.getId()) // diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java index 71c8b9bfe..a9e95de15 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexRequest.java @@ -15,10 +15,10 @@ */ package org.springframework.data.elasticsearch.core.reindex; -import org.elasticsearch.index.query.QueryBuilder; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.Query; import org.springframework.data.elasticsearch.core.query.SourceFilter; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -30,14 +30,15 @@ * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) * * @author Sijia Liu + * @since 4.4 */ -public class PostReindexRequest { +public class ReindexRequest { // Request body private final Source source; private final Dest dest; @Nullable private final Integer maxDocs; - @Nullable private final String conflicts; + @Nullable private final Conflicts conflicts; @Nullable private final Script script; // Query parameters @@ -49,7 +50,7 @@ public class PostReindexRequest { @Nullable private final Duration scroll; @Nullable private final Integer slices; - private PostReindexRequest(Source source, Dest dest,@Nullable Integer maxDocs, @Nullable String conflicts, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { + private ReindexRequest(Source source, Dest dest, @Nullable Integer maxDocs, @Nullable Conflicts conflicts, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { Assert.notNull(source, "source must not be null"); Assert.notNull(dest, "dest must not be null"); @@ -87,7 +88,7 @@ public Script getScript() { } @Nullable - public String getConflicts() { + public Conflicts getConflicts() { return conflicts; } @@ -126,13 +127,17 @@ public Integer getSlices() { return slices; } - public static PostReindexRequestBuilder builder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { - return new PostReindexRequestBuilder(sourceIndex, destIndex); + public static ReindexRequestBuilder builder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + return new ReindexRequestBuilder(sourceIndex, destIndex); + } + + public enum Conflicts { + PROCEED, ABORT } public static class Source { private final IndexCoordinates indexes; - @Nullable private QueryBuilder query; + @Nullable private Query query; @Nullable private Remote remote; @Nullable private Slice slice; @Nullable private Integer size; @@ -143,7 +148,7 @@ private Source(IndexCoordinates indexes){ this.indexes = indexes; } - + public IndexCoordinates getIndexes() { return indexes; } @@ -154,7 +159,7 @@ public Remote getRemote() { } @Nullable - public QueryBuilder getQuery() { + public Query getQuery() { return query; } @@ -202,6 +207,7 @@ public static class Dest { private Dest(IndexCoordinates index) { Assert.notNull(index, "dest index must not be null"); + this.index = index; } @@ -232,9 +238,11 @@ public String getRouting() { public static class Script { private final String source; - private final String lang; + @Nullable private final String lang; + + private Script(String source, @Nullable String lang) { + Assert.notNull(source, "source must not be null"); - Script(String source, String lang) { this.source = source; this.lang = lang; } @@ -243,17 +251,18 @@ public String getSource() { return source; } + @Nullable public String getLang() { return lang; } } - public static final class PostReindexRequestBuilder { + public static final class ReindexRequestBuilder { private final Source source; private final Dest dest; @Nullable private Integer maxDocs; - @Nullable private String conflicts; + @Nullable private Conflicts conflicts; @Nullable private Script script; @Nullable private Duration timeout; @Nullable private Boolean requireAlias; @@ -263,112 +272,115 @@ public static final class PostReindexRequestBuilder { @Nullable private Duration scroll; @Nullable private Integer slices; - public PostReindexRequestBuilder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + public ReindexRequestBuilder(IndexCoordinates sourceIndex, IndexCoordinates destIndex) { + + Assert.notNull(sourceIndex, "sourceIndex must not be null"); + Assert.notNull(destIndex, "destIndex must not be null"); this.source = new Source(sourceIndex); this.dest = new Dest(destIndex); } // region setter - public PostReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { + + public ReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { this.maxDocs = maxDocs; return this; } - public PostReindexRequestBuilder withConflicts(String conflicts) { + public ReindexRequestBuilder withConflicts(Conflicts conflicts) { this.conflicts = conflicts; return this; } - public PostReindexRequestBuilder withSourceQuery(QueryBuilder query) { + public ReindexRequestBuilder withSourceQuery(Query query) { this.source.query = query; return this; } - public PostReindexRequestBuilder withSourceSlice(int id, int max){ + public ReindexRequestBuilder withSourceSlice(int id, int max){ this.source.slice = new Slice(id, max); return this; } - public PostReindexRequestBuilder withSourceRemote(Remote remote) { + public ReindexRequestBuilder withSourceRemote(Remote remote) { this.source.remote = remote; return this; } - public PostReindexRequestBuilder withSourceSize(int size) { + public ReindexRequestBuilder withSourceSize(int size) { this.source.size = size; return this; } - public PostReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ + public ReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ this.source.sourceFilter = sourceFilter; return this; } - public PostReindexRequestBuilder withDestPipeline(String pipelineName){ + public ReindexRequestBuilder withDestPipeline(String pipelineName){ this.dest.pipeline = pipelineName; return this; } - public PostReindexRequestBuilder withDestRouting(String routing){ + public ReindexRequestBuilder withDestRouting(String routing){ this.dest.routing = routing; return this; } - public PostReindexRequestBuilder withDestVersionType(Document.VersionType versionType) { + public ReindexRequestBuilder withDestVersionType(Document.VersionType versionType) { this.dest.versionType = versionType; return this; } - public PostReindexRequestBuilder withDestOpType(IndexQuery.OpType opType) { + public ReindexRequestBuilder withDestOpType(IndexQuery.OpType opType) { this.dest.opType = opType; return this; } - - public PostReindexRequestBuilder withScript(String source, String lang) { + public ReindexRequestBuilder withScript(String source, @Nullable String lang) { this.script = new Script(source, lang); return this; } - public PostReindexRequestBuilder withTimeout(Duration timeout){ + public ReindexRequestBuilder withTimeout(Duration timeout){ this.timeout = timeout; return this; } - public PostReindexRequestBuilder withRequireAlias(boolean requireAlias){ + public ReindexRequestBuilder withRequireAlias(boolean requireAlias){ this.requireAlias = requireAlias; return this; } - public PostReindexRequestBuilder withRefresh(boolean refresh){ + public ReindexRequestBuilder withRefresh(boolean refresh){ this.refresh = refresh; return this; } - public PostReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ + public ReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ this.waitForActiveShards = waitForActiveShards; return this; } - public PostReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ + public ReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ this.requestsPerSecond = requestsPerSecond; return this; } - public PostReindexRequestBuilder withScroll(Duration scroll){ + public ReindexRequestBuilder withScroll(Duration scroll){ this.scroll = scroll; return this; } - public PostReindexRequestBuilder withSlices(int slices){ + public ReindexRequestBuilder withSlices(int slices){ this.slices = slices; return this; } // endregion - public PostReindexRequest build() { - return new PostReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); + public ReindexRequest build() { + return new ReindexRequest(source, dest, maxDocs, conflicts, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); } } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java index 138c8bb41..05be38c65 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/ReindexResponse.java @@ -25,8 +25,9 @@ * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) * * @author Sijia Liu + * @since 4.4 */ -public class PostReindexResponse { +public class ReindexResponse { private final long took; private final boolean timedOut; @@ -43,9 +44,9 @@ public class PostReindexResponse { private final long throttledUntilMillis; private final List failures; - private PostReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, - long versionConflicts, long noops, long bulkRetries, long searchRetries, - long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List failures) { + private ReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, + long versionConflicts, long noops, long bulkRetries, long searchRetries, + long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List failures) { this.took = took; this.timedOut = timedOut; this.total = total; @@ -165,12 +166,12 @@ public List getFailures() { } /** - * Create a new {@link PostReindexResponse} to build {@link PostReindexResponse} + * Create a new {@link ReindexResponseBuilder} to build {@link ReindexResponse} * - * @return a new {@link PostReindexResponse} to build {@link PostReindexResponse} + * @return a new {@link ReindexResponseBuilder} to build {@link ReindexResponse} */ - public static PostReindexResponseBuilder builder() { - return new PostReindexResponseBuilder(); + public static ReindexResponseBuilder builder() { + return new ReindexResponseBuilder(); } public static class Failure { @@ -306,7 +307,7 @@ public Failure build() { } } - public static final class PostReindexResponseBuilder { + public static final class ReindexResponseBuilder { private long took; private boolean timedOut; private long total; @@ -322,80 +323,80 @@ public static final class PostReindexResponseBuilder { private long throttledUntilMillis; private List failures = Collections.emptyList(); - private PostReindexResponseBuilder() {} + private ReindexResponseBuilder() {} - public PostReindexResponseBuilder withTook(long took) { + public ReindexResponseBuilder withTook(long took) { this.took = took; return this; } - public PostReindexResponseBuilder withTimedOut(boolean timedOut) { + public ReindexResponseBuilder withTimedOut(boolean timedOut) { this.timedOut = timedOut; return this; } - public PostReindexResponseBuilder withTotal(long total) { + public ReindexResponseBuilder withTotal(long total) { this.total = total; return this; } - public PostReindexResponseBuilder withUpdated(long updated) { + public ReindexResponseBuilder withUpdated(long updated) { this.updated = updated; return this; } - public PostReindexResponseBuilder withDeleted(long deleted) { + public ReindexResponseBuilder withDeleted(long deleted) { this.deleted = deleted; return this; } - public PostReindexResponseBuilder withBatches(int batches) { + public ReindexResponseBuilder withBatches(int batches) { this.batches = batches; return this; } - public PostReindexResponseBuilder withVersionConflicts(long versionConflicts) { + public ReindexResponseBuilder withVersionConflicts(long versionConflicts) { this.versionConflicts = versionConflicts; return this; } - public PostReindexResponseBuilder withNoops(long noops) { + public ReindexResponseBuilder withNoops(long noops) { this.noops = noops; return this; } - public PostReindexResponseBuilder withBulkRetries(long bulkRetries) { + public ReindexResponseBuilder withBulkRetries(long bulkRetries) { this.bulkRetries = bulkRetries; return this; } - public PostReindexResponseBuilder withSearchRetries(long searchRetries) { + public ReindexResponseBuilder withSearchRetries(long searchRetries) { this.searchRetries = searchRetries; return this; } - public PostReindexResponseBuilder withThrottledMillis(long throttledMillis){ + public ReindexResponseBuilder withThrottledMillis(long throttledMillis){ this.throttledMillis = throttledMillis; return this; } - public PostReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ + public ReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ this.requestsPerSecond = requestsPerSecond; return this; } - public PostReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ + public ReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ this.throttledUntilMillis = throttledUntilMillis; return this; } - public PostReindexResponseBuilder withFailures(List failures) { + public ReindexResponseBuilder withFailures(List failures) { this.failures = failures; return this; } - public PostReindexResponse build() { - return new PostReindexResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, + public ReindexResponse build() { + return new ReindexResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, searchRetries, throttledMillis, requestsPerSecond, throttledUntilMillis, failures); } } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java index bf8a2e231..1b11037c2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/Remote.java @@ -1,15 +1,31 @@ -package org.springframework.data.elasticsearch.core.index.reindex; +/* + * Copyright 2019-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.elasticsearch.core.reindex; -import org.elasticsearch.index.reindex.RemoteInfo; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import java.time.Duration; /** - * Remote info {@link RemoteInfo} + * Remote info + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#source) * * @author Sijia Liu + * @since 4.4 */ public class Remote { private final String scheme; diff --git a/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java b/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java index e69de29bb..2dfc0a174 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/reindex/package-info.java @@ -0,0 +1,3 @@ +@org.springframework.lang.NonNullApi +@org.springframework.lang.NonNullFields +package org.springframework.data.elasticsearch.core.reindex; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index 174aa1ea9..79a2898dc 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -83,8 +83,8 @@ import org.springframework.data.elasticsearch.annotations.MultiField; import org.springframework.data.elasticsearch.annotations.ScriptedField; import org.springframework.data.elasticsearch.annotations.Setting; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexResponse; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.ScriptField; @@ -3660,9 +3660,9 @@ void shouldWorkReindexForExistingIndex() { String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - final PostReindexRequest postReindexRequest = PostReindexRequest.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) + final ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) .withRefresh(true).build(); - final PostReindexResponse reindex = operations.reindex(postReindexRequest); + final ReindexResponse reindex = operations.reindex(reindexRequest); NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); assertThat(reindex.getTotal()).isEqualTo(1); assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); @@ -3674,9 +3674,9 @@ void shouldWorkSubmitReindexTask(){ indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - final PostReindexRequest postReindexRequest = PostReindexRequest + final ReindexRequest reindexRequest = ReindexRequest .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)).build(); - String task = operations.submitReindexTask(postReindexRequest); + String task = operations.submitReindex(reindexRequest); // Maybe there should be a task api to detect whether the task exists assertThat(task).isNotBlank(); } diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index b126ef6be..a19dab2e6 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -21,7 +21,7 @@ import static org.springframework.data.elasticsearch.annotations.FieldType.*; import static org.springframework.data.elasticsearch.utils.IdGenerator.*; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -98,6 +98,7 @@ * @author Russell Parry * @author Roman Puchkovskiy * @author George Popides + * @author Sijia Liu */ @SuppressWarnings("SpringJavaAutowiredMembersInspection") @SpringIntegrationTest @@ -1193,23 +1194,21 @@ void shouldReturnMonoOfReactiveSearchHits() { @Test // #1529 void shouldWorkReindexForExistingIndex() { String sourceIndexName = indexNameProvider.indexName(); - String documentId = nextIdAsString(); - ElasticsearchTemplateTests.SampleEntity sampleEntity = ElasticsearchTemplateTests.SampleEntity.builder().id(documentId).message("abc").build(); + SampleEntity sampleEntity = randomEntity("abc"); operations.save(sampleEntity).block(); indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - final PostReindexRequest postReindexRequest = PostReindexRequest + final ReindexRequest reindexRequest = ReindexRequest .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) .withRefresh(true) .build(); - operations.reindex(postReindexRequest) + operations.reindex(reindexRequest) .as(StepVerifier::create) .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) .verifyComplete(); - NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); - operations.count(searchQuery, SampleEntity.class, IndexCoordinates.of(destIndexName)) + operations.count(operations.matchAllQuery(), SampleEntity.class, IndexCoordinates.of(destIndexName)) .as(StepVerifier::create) .expectNext(1L) .verifyComplete(); @@ -1221,10 +1220,10 @@ void shouldWorkSubmitReindexTask(){ indexNameProvider.increment(); String destIndexName = indexNameProvider.indexName(); operations.indexOps(IndexCoordinates.of(destIndexName)).create(); - final PostReindexRequest postReindexRequest = PostReindexRequest + final ReindexRequest reindexRequest = ReindexRequest .builder(IndexCoordinates.of(sourceIndexName), IndexCoordinates.of(destIndexName)) .build(); - operations.submitReindexTask(postReindexRequest) + operations.submitReindex(reindexRequest) .as(StepVerifier::create) .consumeNextWith(task -> assertThat(task).isNotBlank()) .verifyComplete(); From e331855b3c0a05938d9815c2a7def117c317f108 Mon Sep 17 00:00:00 2001 From: Sijia Liu Date: Mon, 24 Jan 2022 14:49:07 +0800 Subject: [PATCH 8/8] fix source queries must be allowed for reindexing without a remote --- .../elasticsearch/core/RequestFactory.java | 93 ++++++++++--------- .../core/RequestFactoryTests.java | 46 ++++++--- 2 files changed, 82 insertions(+), 57 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 356979169..bf67ace9c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -63,7 +63,6 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; -import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; @@ -93,12 +92,12 @@ import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Source; -import org.springframework.data.elasticsearch.core.index.reindex.Remote; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Dest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Slice; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Source; +import org.springframework.data.elasticsearch.core.reindex.Remote; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Dest; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest.Slice; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -394,40 +393,44 @@ public DeleteIndexTemplateRequest deleteIndexTemplateRequest(DeleteTemplateReque return new DeleteIndexTemplateRequest(deleteTemplateRequest.getTemplateName()); } - public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ - final ReindexRequest reindexRequest = new ReindexRequest(); - if(postReindexRequest.getConflicts() != null){ - reindexRequest.setConflicts(postReindexRequest.getConflicts()); + /** + * @since 4.4 + */ + public org.elasticsearch.index.reindex.ReindexRequest reindexRequest(ReindexRequest reindexRequest){ + final org.elasticsearch.index.reindex.ReindexRequest request = new org.elasticsearch.index.reindex.ReindexRequest(); + if(reindexRequest.getConflicts() != null){ + request.setConflicts(reindexRequest.getConflicts().name().toLowerCase(Locale.ROOT)); } - if(postReindexRequest.getMaxDocs() != null){ - reindexRequest.setMaxDocs(postReindexRequest.getMaxDocs()); + if(reindexRequest.getMaxDocs() != null){ + request.setMaxDocs(reindexRequest.getMaxDocs()); } // region source build - final Source source = postReindexRequest.getSource(); - reindexRequest.setSourceIndices(source.getIndexes().getIndexNames()); - if(source.getQuery() != null && source.getRemote() != null){ - reindexRequest.setSourceQuery(source.getQuery()); + final Source source = reindexRequest.getSource(); + request.setSourceIndices(source.getIndexes().getIndexNames()); + // source query will build from RemoteInfo if remote exist + if(source.getQuery() != null && source.getRemote() == null){ + request.setSourceQuery(getQuery(source.getQuery())); } if(source.getSize() != null){ - reindexRequest.setSourceBatchSize(source.getSize()); + request.setSourceBatchSize(source.getSize()); } if(source.getRemote() != null){ Remote remote = source.getRemote(); - QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : source.getQuery(); - BytesReference query = null; + QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : getQuery(source.getQuery()); + BytesReference query; try { XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint(); query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS)); } catch (IOException e) { - // ignore it + throw new IllegalArgumentException("an IOException occurs while building the source query content",e); } - reindexRequest.setRemoteInfo(new RemoteInfo( + request.setRemoteInfo(new RemoteInfo( remote.getScheme(), remote.getHost(), remote.getPort(), remote.getPathPrefix(), - Objects.requireNonNull(query), + query, remote.getUsername(), remote.getPassword(), Collections.emptyMap(), @@ -438,34 +441,34 @@ public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ final Slice slice = source.getSlice(); if(slice != null){ - reindexRequest.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); + request.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); } final SourceFilter sourceFilter = source.getSourceFilter(); if(sourceFilter != null){ - reindexRequest.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); + request.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); } // endregion // region dest build - final Dest dest = postReindexRequest.getDest(); - reindexRequest.setDestIndex(dest.getIndex().getIndexName()) + final Dest dest = reindexRequest.getDest(); + request.setDestIndex(dest.getIndex().getIndexName()) .setDestRouting(dest.getRouting()) .setDestPipeline(dest.getPipeline()); final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType(); if(versionType != null){ - reindexRequest.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); + request.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); } final IndexQuery.OpType opType = dest.getOpType(); if(opType != null){ - reindexRequest.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); + request.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); } // endregion // region script build - final PostReindexRequest.Script script = postReindexRequest.getScript(); + final ReindexRequest.Script script = reindexRequest.getScript(); if(script != null){ - reindexRequest.setScript(new Script(DEFAULT_SCRIPT_TYPE, + request.setScript(new Script(DEFAULT_SCRIPT_TYPE, script.getLang(), script.getSource(), Collections.emptyMap() @@ -474,31 +477,31 @@ public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ // endregion // region query parameters build - final Duration timeout = postReindexRequest.getTimeout(); + final Duration timeout = reindexRequest.getTimeout(); if(timeout != null){ - reindexRequest.setTimeout(timeValueSeconds(timeout.getSeconds())); + request.setTimeout(timeValueSeconds(timeout.getSeconds())); } - if(postReindexRequest.getRefresh() != null){ - reindexRequest.setRefresh(postReindexRequest.getRefresh()); + if(reindexRequest.getRefresh() != null){ + request.setRefresh(reindexRequest.getRefresh()); } - if(postReindexRequest.getRequireAlias() != null){ - reindexRequest.setRequireAlias(postReindexRequest.getRequireAlias()); + if(reindexRequest.getRequireAlias() != null){ + request.setRequireAlias(reindexRequest.getRequireAlias()); } - if(postReindexRequest.getRequestsPerSecond() != null){ - reindexRequest.setRequestsPerSecond(postReindexRequest.getRequestsPerSecond()); + if(reindexRequest.getRequestsPerSecond() != null){ + request.setRequestsPerSecond(reindexRequest.getRequestsPerSecond()); } - final Duration scroll = postReindexRequest.getScroll(); + final Duration scroll = reindexRequest.getScroll(); if(scroll != null){ - reindexRequest.setScroll(timeValueSeconds(scroll.getSeconds())); + request.setScroll(timeValueSeconds(scroll.getSeconds())); } - if(postReindexRequest.getWaitForActiveShards() != null){ - reindexRequest.setWaitForActiveShards(ActiveShardCount.parseString(postReindexRequest.getWaitForActiveShards())); + if(reindexRequest.getWaitForActiveShards() != null){ + request.setWaitForActiveShards(ActiveShardCount.parseString(reindexRequest.getWaitForActiveShards())); } - if(postReindexRequest.getSlices() != null){ - reindexRequest.setSlices(postReindexRequest.getSlices()); + if(reindexRequest.getSlices() != null){ + request.setSlices(reindexRequest.getSlices()); } // endregion - return reindexRequest; + return request; } // endregion diff --git a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java index f916ce599..e327d1c35 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java @@ -36,7 +36,6 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder.FilterFunctionBuilder; import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder; -import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentType; import org.json.JSONException; @@ -57,8 +56,8 @@ import org.springframework.data.elasticsearch.core.index.AliasActionParameters; import org.springframework.data.elasticsearch.core.index.AliasActions; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; -import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; -import org.springframework.data.elasticsearch.core.index.reindex.Remote; +import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; +import org.springframework.data.elasticsearch.core.reindex.Remote; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; import org.springframework.data.elasticsearch.core.query.*; @@ -560,19 +559,19 @@ void shouldSetStoredFieldsOnSearchRequest() { } @Test // #1529 - void shouldCreatePostReindexRequest() throws IOException, JSONException { + void shouldCreateReindexRequest() throws IOException, JSONException { final String expected = "{\n" + " \"source\":{\n" + " \"remote\":{\n" + " \"username\":\"admin\",\n" + - " \"password\":\"admin\",\n" + + " \"password\":\"password\",\n" + " \"host\":\"http://localhost:9200/elasticsearch\",\n" + " \"socket_timeout\":\"30s\",\n" + " \"connect_timeout\":\"30s\"\n" + " },\n" + " \"index\":[\"source_1\",\"source_2\"],\n" + " \"size\":5,\n" + - " \"query\":{\"match_all\":{\"boost\":1.0}},\n" + + " \"query\":{\"match_all\":{}},\n" + " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + " \"slice\":{\"id\":1,\"max\":20}\n" + " },\n" + @@ -591,15 +590,15 @@ void shouldCreatePostReindexRequest() throws IOException, JSONException { Remote remote = Remote.builder("http", "localhost",9200) .withPathPrefix("elasticsearch") .withUsername("admin") - .withPassword("admin") + .withPassword("password") .withConnectTimeout(Duration.ofSeconds(30)) .withSocketTimeout(Duration.ofSeconds(30)).build(); - PostReindexRequest postReindexRequest = PostReindexRequest.builder(IndexCoordinates.of("source_1", "source_2"), + ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source_1", "source_2"), IndexCoordinates.of("destination")) - .withConflicts("proceed") + .withConflicts(ReindexRequest.Conflicts.PROCEED) .withMaxDocs(10) - .withSourceQuery(QueryBuilders.matchAllQuery()) + .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) .withSourceSize(5) .withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()) .withSourceRemote(remote) @@ -611,8 +610,31 @@ void shouldCreatePostReindexRequest() throws IOException, JSONException { .withScript("Math.max(1,2)", "java") .build(); - final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); - final String json = requestToString(reindexRequest); + final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); + + assertEquals(expected, json, false); + } + + @Test + void shouldAllowSourceQueryForReindexWithoutRemote() throws IOException, JSONException { + final String expected = "{\n" + + " \"source\":{\n" + + " \"index\":[\"source\"],\n" + + " \"query\":{\"match_all\":{}}\n" + + " },\n" + + " \"dest\":{\n" + + " \"index\":\"destination\",\n" + + " \"op_type\":\"index\",\n" + + " \"version_type\":\"internal\"\n" + + " }\n" + + "}"; + + ReindexRequest reindexRequest = ReindexRequest.builder(IndexCoordinates.of("source"), + IndexCoordinates.of("destination")) + .withSourceQuery(new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build()) + .build(); + + final String json = requestToString(requestFactory.reindexRequest(reindexRequest)); assertEquals(expected, json, false); }