diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java index 001e44b1c..9ace9a169 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java @@ -83,11 +83,13 @@ import org.elasticsearch.client.GetAliasesResponse; import org.elasticsearch.client.Request; import org.elasticsearch.client.indices.*; +import org.elasticsearch.client.tasks.TaskSubmissionResponse; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.rest.BytesRestResponse; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.script.mustache.SearchTemplateRequest; @@ -143,6 +145,7 @@ * @author Brian Clozel * @author Farid Faoudi * @author George Popides + * @author Sijia Liu * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -509,6 +512,19 @@ public Mono bulk(HttpHeaders headers, BulkRequest bulkRequest) { .next(); } + @Override + public Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers) + .next(); + } + + @Override + public Mono submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest) { + return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers) + .next() + .map(TaskSubmissionResponse::getTask); + } + @Override public Mono execute(ReactiveElasticsearchClientCallback callback) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java index 030243851..7b5ca47dd 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java @@ -54,6 +54,7 @@ import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; @@ -76,6 +77,7 @@ * @author Henrique Amaral * @author Thomas Geese * @author Farid Faoudi + * @author Sijia Liu * @since 3.2 * @see ClientConfiguration * @see ReactiveRestClients @@ -713,6 +715,69 @@ default Mono bulk(BulkRequest bulkRequest) { */ Mono bulk(HttpHeaders headers, BulkRequest bulkRequest); + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param consumer must not be {@literal null} + * @return the {@link Mono} emitting the response + */ + default Mono reindex(Consumer consumer){ + + ReindexRequest reindexRequest = new ReindexRequest(); + consumer.accept(reindexRequest); + return reindex(reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the response + */ + default Mono reindex(ReindexRequest reindexRequest){ + return reindex(HttpHeaders.EMPTY, reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the response + */ + Mono reindex(HttpHeaders headers, ReindexRequest reindexRequest); + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param consumer must not be {@literal null} + * @return the {@link Mono} emitting the task + */ + default Mono submitReindexTask(Consumer consumer){ + + ReindexRequest reindexRequest = new ReindexRequest(); + consumer.accept(reindexRequest); + return submitReindexTask(reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the task + */ + default Mono submitReindexTask(ReindexRequest reindexRequest){ + return submitReindexTask(HttpHeaders.EMPTY, reindexRequest); + } + + /** + * Execute the given {@link ReindexRequest} against the {@literal reindex} API. + * + * @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}. + * @param reindexRequest must not be {@literal null} + * @return the {@link Mono} emitting the task + */ + Mono submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest); /** * Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}. * {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and diff --git a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java index 34fb05016..0dd71c5b4 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java @@ -49,6 +49,7 @@ import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.mustache.SearchTemplateRequest; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; @@ -289,4 +290,8 @@ default Function getIndex() { default Function clusterHealth() { return RequestConverters::clusterHealth; } + + default Function reindex() { return RequestConverters::reindex; } + + default Function submitReindex() { return RequestConverters::submitReindex; } } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java index 6d5ad016d..82e9df984 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java @@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) { return request; } - public static Request reindex(ReindexRequest reindexRequest) throws IOException { + public static Request reindex(ReindexRequest reindexRequest) { return prepareReindexRequest(reindexRequest, true); } - static Request submitReindex(ReindexRequest reindexRequest) throws IOException { + public static Request submitReindex(ReindexRequest reindexRequest) { return prepareReindexRequest(reindexRequest, false); } @@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool .withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards()) .withRequestsPerSecond(reindexRequest.getRequestsPerSecond()); + if(reindexRequest.getDestination().isRequireAlias()){ + params.putParam("require_alias", Boolean.TRUE.toString()); + } if (reindexRequest.getScrollTime() != null) { params.putParam("scroll", reindexRequest.getScrollTime()); } + params.putParam("slices", Integer.toString(reindexRequest.getSlices())); + if(reindexRequest.getMaxDocs() > -1){ + params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs())); + } request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE)); return request; } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java index d795462c1..772989e2d 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java @@ -18,6 +18,8 @@ import java.util.Collection; import java.util.List; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -34,6 +36,7 @@ * * @author Peter-Josef Meisch * @author Farid Faoudi + * @author Sijia Liu * @since 4.0 */ public interface DocumentOperations { @@ -322,4 +325,24 @@ default void bulkUpdate(List queries, IndexCoordinates index) { * @since 4.2 */ ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Copies documents from a source to a destination. + * The source can be any existing index, alias, or data stream. The destination must differ from the source. + * For example, you cannot reindex a data stream into itself. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return the reindex response + */ + PostReindexResponse reindex(PostReindexRequest postReindexRequest); + + /** + * Submits a reindex task. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return the task + */ + String submitReindexTask(PostReindexRequest postReindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java index 84277ce34..c2874847c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.fetch.subphase.FetchSourceContext; import org.elasticsearch.search.suggest.SuggestBuilder; @@ -61,7 +62,9 @@ import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.document.DocumentAdapters; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; @@ -106,6 +109,7 @@ * @author Gyula Attila Csorogi * @author Massimiliano Poggi * @author Farid Faoudi + * @author Sijia Liu * @since 4.4 */ public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate { @@ -277,6 +281,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) return ResponseConverter.byQueryResponseOf(bulkByScrollResponse); } + @Override + public PostReindexResponse reindex(PostReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final BulkByScrollResponse bulkByScrollResponse = execute( + client -> client.reindex(reindexRequest, RequestOptions.DEFAULT)); + return ResponseConverter.postReindexResponseOf(bulkByScrollResponse); + } + + @Override + public String submitReindexTask(PostReindexRequest postReindexRequest) { + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return execute( + client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask()); + } + public List doBulkOperation(List queries, BulkOptions bulkOptions, IndexCoordinates index) { BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java index 995a82897..f04b7dbb2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java @@ -15,6 +15,8 @@ */ package org.springframework.data.elasticsearch.core; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,6 +40,7 @@ * @author Aleksei Arsenev * @author Roman Puchkovskiy * @author Farid Faoudi + * @author Sijia Liu * @since 4.0 */ public interface ReactiveDocumentOperations { @@ -302,4 +305,24 @@ default Mono bulkUpdate(List queries, IndexCoordinates index) * @since 4.2 */ Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index); + + /** + * Copies documents from a source to a destination. + * The source can be any existing index, alias, or data stream. The destination must differ from the source. + * For example, you cannot reindex a data stream into itself. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return a {@link Mono} emitting the reindex response + */ + Mono reindex(PostReindexRequest postReindexRequest); + + /** + * Submits a reindex task. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @param postReindexRequest reindex request parameters + * @return a {@link Mono} emitting the {@literal task}. + */ + Mono submitReindexTask(PostReindexRequest postReindexRequest); } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java index 387f6d3fa..2ac177fa2 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.BulkByScrollResponse; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.search.suggest.SuggestBuilder; import org.reactivestreams.Publisher; @@ -70,6 +71,8 @@ import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback; import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback; import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -105,6 +108,7 @@ * @author Russell Parry * @author Thomas Geese * @author Farid Faoudi + * @author Sijia Liu * @since 3.2 */ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware { @@ -609,6 +613,28 @@ public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordin }); } + @Override + public Mono reindex(PostReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + return Mono.defer(() -> { + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::postReindexResponseOf); + }); + } + + @Override + public Mono submitReindexTask(PostReindexRequest postReindexRequest) { + + Assert.notNull(postReindexRequest, "postReindexRequest must not be null"); + + return Mono.defer(() -> { + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + return Mono.from(execute(client -> client.submitReindexTask(reindexRequest))); + }); + } + @Override public Mono delete(Query query, Class entityType) { return delete(query, entityType, getIndexCoordinatesFor(entityType)); diff --git a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java index 0251bfeab..948c73e5c 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/RequestFactory.java @@ -15,19 +15,15 @@ */ package org.springframework.data.elasticsearch.core; +import static org.elasticsearch.core.TimeValue.*; import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.elasticsearch.index.reindex.RemoteInfo.*; +import static org.elasticsearch.script.Script.*; import static org.springframework.util.CollectionUtils.*; +import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.elasticsearch.action.DocWriteRequest; @@ -58,6 +54,7 @@ import org.elasticsearch.client.indices.IndexTemplatesExistRequest; import org.elasticsearch.client.indices.PutIndexTemplateRequest; import org.elasticsearch.client.indices.PutMappingRequest; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoDistance; import org.elasticsearch.common.unit.DistanceUnit; import org.elasticsearch.core.TimeValue; @@ -66,6 +63,8 @@ import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.reindex.DeleteByQueryRequest; +import org.elasticsearch.index.reindex.ReindexRequest; +import org.elasticsearch.index.reindex.RemoteInfo; import org.elasticsearch.index.reindex.UpdateByQueryRequest; import org.elasticsearch.script.Script; import org.elasticsearch.search.builder.SearchSourceBuilder; @@ -73,6 +72,7 @@ import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.rescore.QueryRescoreMode; import org.elasticsearch.search.rescore.QueryRescorerBuilder; +import org.elasticsearch.search.slice.SliceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; import org.elasticsearch.search.sort.GeoDistanceSortBuilder; import org.elasticsearch.search.sort.ScoreSortBuilder; @@ -81,6 +81,8 @@ import org.elasticsearch.search.sort.SortMode; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.suggest.SuggestBuilder; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; @@ -91,7 +93,12 @@ import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest; import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest; import org.springframework.data.elasticsearch.core.index.GetTemplateRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Source; +import org.springframework.data.elasticsearch.core.index.reindex.Remote; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Dest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest.Slice; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; @@ -387,6 +394,113 @@ public DeleteIndexTemplateRequest deleteIndexTemplateRequest(DeleteTemplateReque return new DeleteIndexTemplateRequest(deleteTemplateRequest.getTemplateName()); } + public ReindexRequest reindexRequest(PostReindexRequest postReindexRequest){ + final ReindexRequest reindexRequest = new ReindexRequest(); + if(postReindexRequest.getConflicts() != null){ + reindexRequest.setConflicts(postReindexRequest.getConflicts()); + } + if(postReindexRequest.getMaxDocs() != null){ + reindexRequest.setMaxDocs(postReindexRequest.getMaxDocs()); + } + // region source build + final Source source = postReindexRequest.getSource(); + reindexRequest.setSourceIndices(source.getIndexes().toArray(new String[]{})); + if(source.getQuery() != null && source.getRemote() != null){ + reindexRequest.setSourceQuery(source.getQuery()); + } + if(source.getSize() != null){ + reindexRequest.setSourceBatchSize(source.getSize()); + } + + if(source.getRemote() != null){ + Remote remote = source.getRemote(); + QueryBuilder queryBuilder = source.getQuery() == null ? QueryBuilders.matchAllQuery() : source.getQuery(); + BytesReference query = null; + try { + XContentBuilder builder = XContentBuilder.builder(QUERY_CONTENT_TYPE).prettyPrint(); + query = BytesReference.bytes(queryBuilder.toXContent(builder, ToXContent.EMPTY_PARAMS)); + } catch (IOException e) { + // ignore it + } + reindexRequest.setRemoteInfo(new RemoteInfo( + remote.getScheme(), + remote.getHost(), + remote.getPort(), + remote.getPathPrefix(), + Objects.requireNonNull(query), + remote.getUsername(), + remote.getPassword(), + Collections.emptyMap(), + remote.getSocketTimeout() == null ? DEFAULT_SOCKET_TIMEOUT : timeValueSeconds(remote.getSocketTimeout().getSeconds()), + remote.getConnectTimeout() == null ? DEFAULT_CONNECT_TIMEOUT : timeValueSeconds(remote.getConnectTimeout().getSeconds()) + )); + } + + final Slice slice = source.getSlice(); + if(slice != null){ + reindexRequest.getSearchRequest().source().slice(new SliceBuilder(slice.getId(), slice.getMax())); + } + final SourceFilter sourceFilter = source.getSourceFilter(); + if(sourceFilter != null){ + reindexRequest.getSearchRequest().source().fetchSource(sourceFilter.getIncludes(), sourceFilter.getExcludes()); + } + // endregion + + // region dest build + final Dest dest = postReindexRequest.getDest(); + reindexRequest.setDestIndex(dest.getIndex()) + .setDestRouting(dest.getRouting()) + .setDestPipeline(dest.getPipeline()); + + final org.springframework.data.elasticsearch.annotations.Document.VersionType versionType = dest.getVersionType(); + if(versionType != null){ + reindexRequest.setDestVersionType(VersionType.fromString(versionType.name().toLowerCase(Locale.ROOT))); + } + final IndexQuery.OpType opType = dest.getOpType(); + if(opType != null){ + reindexRequest.setDestOpType(opType.name().toLowerCase(Locale.ROOT)); + } + // endregion + + // region script build + final PostReindexRequest.Script script = postReindexRequest.getScript(); + if(script != null){ + reindexRequest.setScript(new Script(DEFAULT_SCRIPT_TYPE, + script.getLang(), + script.getSource(), + Collections.emptyMap() + )); + } + // endregion + + // region query parameters build + final Duration timeout = postReindexRequest.getTimeout(); + if(timeout != null){ + reindexRequest.setTimeout(timeValueSeconds(timeout.getSeconds())); + } + if(postReindexRequest.getRefresh() != null){ + reindexRequest.setRefresh(postReindexRequest.getRefresh()); + } + if(postReindexRequest.getRequireAlias() != null){ + reindexRequest.setRequireAlias(postReindexRequest.getRequireAlias()); + } + if(postReindexRequest.getRequestsPerSecond() != null){ + reindexRequest.setRequestsPerSecond(postReindexRequest.getRequestsPerSecond()); + } + final Duration scroll = postReindexRequest.getScroll(); + if(scroll != null){ + reindexRequest.setScroll(timeValueSeconds(scroll.getSeconds())); + } + if(postReindexRequest.getWaitForActiveShards() != null){ + reindexRequest.setWaitForActiveShards(ActiveShardCount.parseString(postReindexRequest.getWaitForActiveShards())); + } + if(postReindexRequest.getSlices() != null){ + reindexRequest.setSlices(postReindexRequest.getSlices()); + } + // endregion + return reindexRequest; + } + // endregion // region delete diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java index 8f9d7928e..e4be000b1 100644 --- a/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ResponseConverter.java @@ -45,6 +45,7 @@ import org.springframework.data.elasticsearch.core.index.AliasData; import org.springframework.data.elasticsearch.core.index.Settings; import org.springframework.data.elasticsearch.core.index.TemplateData; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -54,6 +55,7 @@ * * @author George Popides * @author Peter-Josef Meisch + * @author Sijia Liu * @since 4.2 */ public class ResponseConverter { @@ -384,4 +386,46 @@ public static ByQueryResponse.SearchFailure byQueryResponseSearchFailureOf( } // endregion + + // region postReindexResponse + + public static PostReindexResponse postReindexResponseOf(BulkByScrollResponse bulkByScrollResponse){ + final List failures = bulkByScrollResponse.getBulkFailures() // + .stream() // + .map(ResponseConverter::postReindexResponseFailureOf) // + .collect(Collectors.toList()); // + + return PostReindexResponse.builder() // + .withTook(bulkByScrollResponse.getTook().getMillis()) // + .withTimedOut(bulkByScrollResponse.isTimedOut()) // + .withTotal(bulkByScrollResponse.getTotal()) // + .withUpdated(bulkByScrollResponse.getUpdated()) // + .withDeleted(bulkByScrollResponse.getDeleted()) // + .withBatches(bulkByScrollResponse.getBatches()) // + .withVersionConflicts(bulkByScrollResponse.getVersionConflicts()) // + .withNoops(bulkByScrollResponse.getNoops()) // + .withBulkRetries(bulkByScrollResponse.getBulkRetries()) // + .withSearchRetries(bulkByScrollResponse.getSearchRetries()) // + .withThrottledMillis(bulkByScrollResponse.getStatus().getThrottled().getMillis()) // + .withRequestsPerSecond(bulkByScrollResponse.getStatus().getRequestsPerSecond()) // + .withThrottledUntilMillis(bulkByScrollResponse.getStatus().getThrottledUntil().getMillis()) // + .withFailures(failures) // + .build(); // + + } + + public static PostReindexResponse.Failure postReindexResponseFailureOf(BulkItemResponse.Failure failure) { + return PostReindexResponse.Failure.builder() // + .withIndex(failure.getIndex()) // + .withType(failure.getType()) // + .withId(failure.getId()) // + .withStatus(failure.getStatus().getStatus()) // + .withAborted(failure.isAborted()) // + .withCause(failure.getCause()) // + .withSeqNo(failure.getSeqNo()) // + .withTerm(failure.getTerm()) // + .build(); // + } + + // endregion } diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java new file mode 100644 index 000000000..64e26e4d3 --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexRequest.java @@ -0,0 +1,371 @@ +package org.springframework.data.elasticsearch.core.index.reindex; + +import org.elasticsearch.index.query.QueryBuilder; +import org.springframework.data.elasticsearch.annotations.Document; +import org.springframework.data.elasticsearch.core.query.IndexQuery; +import org.springframework.data.elasticsearch.core.query.SourceFilter; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; + +import static org.springframework.util.CollectionUtils.*; + +/** + * Request to reindex some documents from one index to another. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html) + * + * @author Sijia Liu + */ +public class PostReindexRequest { + + // Request body + private final Source source; + private final Dest dest; + @Nullable private final Integer maxDocs; + @Nullable private final String conflicts; + @Nullable private final Script script; + + // Query parameters + @Nullable private final Duration timeout; + @Nullable private final Boolean requireAlias; + @Nullable private final Boolean refresh; + @Nullable private final String waitForActiveShards; + @Nullable private final Integer requestsPerSecond; + @Nullable private final Duration scroll; + @Nullable private final Integer slices; + + PostReindexRequest(@Nullable Integer maxDocs, @Nullable String conflicts, Source source, Dest dest, @Nullable Script script, @Nullable Duration timeout, @Nullable Boolean requireAlias, @Nullable Boolean refresh, @Nullable String waitForActiveShards, @Nullable Integer requestsPerSecond, @Nullable Duration scroll, @Nullable Integer slices) { + this.maxDocs = maxDocs; + this.conflicts = conflicts; + this.source = source; + this.dest = dest; + this.script = script; + this.timeout = timeout; + this.requireAlias = requireAlias; + this.refresh = refresh; + this.waitForActiveShards = waitForActiveShards; + this.requestsPerSecond = requestsPerSecond; + this.scroll = scroll; + this.slices = slices; + } + + @Nullable + public Integer getMaxDocs() { + return maxDocs; + } + + public Source getSource() { + return source; + } + + public Dest getDest() { + return dest; + } + + @Nullable + public Script getScript() { + return script; + } + + @Nullable + public String getConflicts() { + return conflicts; + } + + @Nullable + public Boolean getRequireAlias() { + return requireAlias; + } + + @Nullable + public Duration getTimeout() { + return timeout; + } + + @Nullable + public Boolean getRefresh() { + return refresh; + } + + @Nullable + public String getWaitForActiveShards() { + return waitForActiveShards; + } + + @Nullable + public Integer getRequestsPerSecond() { + return requestsPerSecond; + } + + @Nullable + public Duration getScroll() { + return scroll; + } + + @Nullable + public Integer getSlices() { + return slices; + } + + public static PostReindexRequestBuilder builder(String sourceIndex, String destIndex) { + return new PostReindexRequestBuilder(sourceIndex, destIndex); + } + + public static class Source { + private final List indexes = new ArrayList<>(); + @Nullable private QueryBuilder query; + @Nullable private Remote remote; + @Nullable private Slice slice; + @Nullable private Integer size; + @Nullable private SourceFilter sourceFilter; + + public List getIndexes() { + return indexes; + } + + @Nullable + public Remote getRemote() { + return remote; + } + + @Nullable + public QueryBuilder getQuery() { + return query; + } + + @Nullable + public Integer getSize() { + return size; + } + + @Nullable + public Slice getSlice() { + return slice; + } + + @Nullable + public SourceFilter getSourceFilter() { + return sourceFilter; + } + } + + public static class Slice { + private final Integer id; + private final Integer max; + + Slice(Integer id, Integer max) { + this.id = id; + this.max = max; + } + + public Integer getId() { + return id; + } + + public Integer getMax() { + return max; + } + } + + public static class Dest { + + private final String index; + @Nullable private String pipeline; + @Nullable private String routing; + @Nullable private Document.VersionType versionType; + @Nullable private IndexQuery.OpType opType; + + Dest(String index) { + Assert.notNull(index, "dest index must not be null"); + this.index = index; + } + + public String getIndex() { + return index; + } + + @Nullable + public Document.VersionType getVersionType() { + return versionType; + } + + @Nullable + public IndexQuery.OpType getOpType() { + return opType; + } + + @Nullable + public String getPipeline() { + return pipeline; + } + + @Nullable + public String getRouting() { + return routing; + } + } + + public static class Script { + private final String source; + private final String lang; + + Script(String source, String lang) { + this.source = source; + this.lang = lang; + } + + public String getSource() { + return source; + } + + public String getLang() { + return lang; + } + } + + public static final class PostReindexRequestBuilder { + + private final Source source; + private final Dest dest; + @Nullable private Integer maxDocs; + @Nullable private String conflicts; + @Nullable private Script script; + @Nullable private Duration timeout; + @Nullable private Boolean requireAlias; + @Nullable private Boolean refresh; + @Nullable private String waitForActiveShards; + @Nullable private Integer requestsPerSecond; + @Nullable private Duration scroll; + @Nullable private Integer slices; + + public PostReindexRequestBuilder(String sourceIndex, String destIndex) { + this.source = new Source(); + this.source.indexes.add(sourceIndex); + this.dest = new Dest(destIndex); + } + + // region setter + public PostReindexRequestBuilder withMaxDocs(@Nullable Integer maxDocs) { + this.maxDocs = maxDocs; + return this; + } + + public PostReindexRequestBuilder withConflicts(String conflicts) { + this.conflicts = conflicts; + return this; + } + + public PostReindexRequestBuilder addSourceIndex(String sourceIndex) { + Assert.notNull(sourceIndex, "source index must not be null"); + this.source.indexes.add(sourceIndex); + return this; + } + + public PostReindexRequestBuilder withSourceIndexes(List sourceIndexes) { + if (!isEmpty(sourceIndexes)) { + clearSourceIndexes(); + this.source.indexes.addAll(sourceIndexes); + } + return this; + } + + public PostReindexRequestBuilder clearSourceIndexes() { + this.source.indexes.clear(); + return this; + } + + public PostReindexRequestBuilder withSourceQuery(QueryBuilder query) { + this.source.query = query; + return this; + } + + public PostReindexRequestBuilder withSourceSlice(int id, int max){ + this.source.slice = new Slice(id, max); + return this; + } + + public PostReindexRequestBuilder withSourceRemote(Remote remote) { + this.source.remote = remote; + return this; + } + + public PostReindexRequestBuilder withSourceSize(int size) { + this.source.size = size; + return this; + } + + public PostReindexRequestBuilder withSourceSourceFilter(SourceFilter sourceFilter){ + this.source.sourceFilter = sourceFilter; + return this; + } + + public PostReindexRequestBuilder withDestPipeline(String pipelineName){ + this.dest.pipeline = pipelineName; + return this; + } + + public PostReindexRequestBuilder withDestRouting(String routing){ + this.dest.routing = routing; + return this; + } + + public PostReindexRequestBuilder withDestVersionType(Document.VersionType versionType) { + this.dest.versionType = versionType; + return this; + } + + public PostReindexRequestBuilder withDestOpType(IndexQuery.OpType opType) { + this.dest.opType = opType; + return this; + } + + + public PostReindexRequestBuilder withScript(String source, String lang) { + this.script = new Script(source, lang); + return this; + } + + public PostReindexRequestBuilder withTimeout(Duration timeout){ + this.timeout = timeout; + return this; + } + + public PostReindexRequestBuilder withRequireAlias(boolean requireAlias){ + this.requireAlias = requireAlias; + return this; + } + + public PostReindexRequestBuilder withRefresh(boolean refresh){ + this.refresh = refresh; + return this; + } + + public PostReindexRequestBuilder withWaitForActiveShards(String waitForActiveShards){ + this.waitForActiveShards = waitForActiveShards; + return this; + } + + public PostReindexRequestBuilder withRequestsPerSecond(int requestsPerSecond){ + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public PostReindexRequestBuilder withScroll(Duration scroll){ + this.scroll = scroll; + return this; + } + + public PostReindexRequestBuilder withSlices(int slices){ + this.slices = slices; + return this; + } + // endregion + + public PostReindexRequest build() { + return new PostReindexRequest(maxDocs, conflicts, source, dest, script, timeout, requireAlias, refresh, waitForActiveShards, requestsPerSecond, scroll, slices); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java new file mode 100644 index 000000000..820e7588c --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/PostReindexResponse.java @@ -0,0 +1,387 @@ +package org.springframework.data.elasticsearch.core.index.reindex; + +import org.springframework.lang.Nullable; + +import java.util.Collections; +import java.util.List; + +/** + * Response of reindex request. + * (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-api-response-body) + * + * @author Onizuka + */ +public class PostReindexResponse { + + private final long took; + private final boolean timedOut; + private final long total; + private final long updated; + private final long deleted; + private final int batches; + private final long versionConflicts; + private final long noops; + private final long bulkRetries; + private final long searchRetries; + private final long throttledMillis; + private final double requestsPerSecond; + private final long throttledUntilMillis; + private final List failures; + + private PostReindexResponse(long took, boolean timedOut, long total, long updated, long deleted, int batches, + long versionConflicts, long noops, long bulkRetries, long searchRetries, + long throttledMillis, double requestsPerSecond, long throttledUntilMillis, List failures) { + this.took = took; + this.timedOut = timedOut; + this.total = total; + this.updated = updated; + this.deleted = deleted; + this.batches = batches; + this.versionConflicts = versionConflicts; + this.noops = noops; + this.bulkRetries = bulkRetries; + this.searchRetries = searchRetries; + this.throttledMillis = throttledMillis; + this.requestsPerSecond = requestsPerSecond; + this.throttledUntilMillis = throttledUntilMillis; + this.failures = failures; + } + + /** + * The number of milliseconds from start to end of the whole operation. + */ + public long getTook() { + return took; + } + + /** + * Did any of the sub-requests that were part of this request timeout? + */ + public boolean isTimedOut() { + return timedOut; + } + + /** + * The number of documents that were successfully processed. + */ + public long getTotal() { + return total; + } + + /** + * The number of documents that were successfully updated. + */ + public long getUpdated() { + return updated; + } + + /** + * The number of documents that were successfully deleted. + */ + public long getDeleted() { + return deleted; + } + + /** + * The number of scroll responses pulled back by the update by query. + */ + public int getBatches() { + return batches; + } + + /** + * The number of version conflicts that the update by query hit. + */ + public long getVersionConflicts() { + return versionConflicts; + } + + /** + * The number of documents that were ignored because the script used for the update by query returned a noop value for + * ctx.op. + */ + public long getNoops() { + return noops; + } + + /** + * The number of times that the request had retry bulk actions. + */ + public long getBulkRetries() { + return bulkRetries; + } + + /** + * The number of times that the request had retry search actions. + */ + public long getSearchRetries() { + return searchRetries; + } + + /** + * Number of milliseconds the request slept to conform to requests_per_second. + */ + public long getThrottledMillis() { + return throttledMillis; + } + + /** + * The number of requests per second effectively executed during the reindex. + */ + public double getRequestsPerSecond() { + return requestsPerSecond; + } + + /** + * This field should always be equal to zero in a _reindex response. + * It only has meaning when using the Task API, where it indicates the next time (in milliseconds since epoch) + * a throttled request will be executed again in order to conform to requests_per_second. + */ + public long getThrottledUntilMillis() { + return throttledUntilMillis; + } + + /** + * All of the bulk failures. Version conflicts are only included if the request sets abortOnVersionConflict to true + * (the default). + */ + public List getFailures() { + return failures; + } + + /** + * Create a new {@link PostReindexResponse} to build {@link PostReindexResponse} + * + * @return a new {@link PostReindexResponse} to build {@link PostReindexResponse} + */ + public static PostReindexResponseBuilder builder() { + return new PostReindexResponseBuilder(); + } + + public static class Failure { + + @Nullable private final String index; + @Nullable private final String type; + @Nullable private final String id; + @Nullable private final Exception cause; + @Nullable private final Integer status; + @Nullable private final Long seqNo; + @Nullable private final Long term; + @Nullable private final Boolean aborted; + + private Failure(@Nullable String index, @Nullable String type, @Nullable String id, @Nullable Exception cause, + @Nullable Integer status, @Nullable Long seqNo, @Nullable Long term, @Nullable Boolean aborted) { + this.index = index; + this.type = type; + this.id = id; + this.cause = cause; + this.status = status; + this.seqNo = seqNo; + this.term = term; + this.aborted = aborted; + } + + @Nullable + public String getIndex() { + return index; + } + + @Nullable + public String getType() { + return type; + } + + @Nullable + public String getId() { + return id; + } + + @Nullable + public Exception getCause() { + return cause; + } + + @Nullable + public Integer getStatus() { + return status; + } + + @Nullable + public Long getSeqNo() { + return seqNo; + } + + @Nullable + public Long getTerm() { + return term; + } + + @Nullable + public Boolean getAborted() { + return aborted; + } + + /** + * Create a new {@link Failure.FailureBuilder} to build {@link Failure} + * + * @return a new {@link Failure.FailureBuilder} to build {@link Failure} + */ + public static Failure.FailureBuilder builder() { + return new Failure.FailureBuilder(); + } + + /** + * Builder for {@link Failure} + */ + public static final class FailureBuilder { + @Nullable private String index; + @Nullable private String type; + @Nullable private String id; + @Nullable private Exception cause; + @Nullable private Integer status; + @Nullable private Long seqNo; + @Nullable private Long term; + @Nullable private Boolean aborted; + + private FailureBuilder() {} + + public Failure.FailureBuilder withIndex(String index) { + this.index = index; + return this; + } + + public Failure.FailureBuilder withType(String type) { + this.type = type; + return this; + } + + public Failure.FailureBuilder withId(String id) { + this.id = id; + return this; + } + + public Failure.FailureBuilder withCause(Exception cause) { + this.cause = cause; + return this; + } + + public Failure.FailureBuilder withStatus(Integer status) { + this.status = status; + return this; + } + + public Failure.FailureBuilder withSeqNo(Long seqNo) { + this.seqNo = seqNo; + return this; + } + + public Failure.FailureBuilder withTerm(Long term) { + this.term = term; + return this; + } + + public Failure.FailureBuilder withAborted(Boolean aborted) { + this.aborted = aborted; + return this; + } + + public Failure build() { + return new Failure(index, type, id, cause, status, seqNo, term, aborted); + } + } + } + + public static final class PostReindexResponseBuilder { + private long took; + private boolean timedOut; + private long total; + private long updated; + private long deleted; + private int batches; + private long versionConflicts; + private long noops; + private long bulkRetries; + private long searchRetries; + private long throttledMillis; + private double requestsPerSecond; + private long throttledUntilMillis; + private List failures = Collections.emptyList(); + + private PostReindexResponseBuilder() {} + + public PostReindexResponseBuilder withTook(long took) { + this.took = took; + return this; + } + + public PostReindexResponseBuilder withTimedOut(boolean timedOut) { + this.timedOut = timedOut; + return this; + } + + public PostReindexResponseBuilder withTotal(long total) { + this.total = total; + return this; + } + + public PostReindexResponseBuilder withUpdated(long updated) { + this.updated = updated; + return this; + } + + public PostReindexResponseBuilder withDeleted(long deleted) { + this.deleted = deleted; + return this; + } + + public PostReindexResponseBuilder withBatches(int batches) { + this.batches = batches; + return this; + } + + public PostReindexResponseBuilder withVersionConflicts(long versionConflicts) { + this.versionConflicts = versionConflicts; + return this; + } + + public PostReindexResponseBuilder withNoops(long noops) { + this.noops = noops; + return this; + } + + public PostReindexResponseBuilder withBulkRetries(long bulkRetries) { + this.bulkRetries = bulkRetries; + return this; + } + + public PostReindexResponseBuilder withSearchRetries(long searchRetries) { + this.searchRetries = searchRetries; + return this; + } + + public PostReindexResponseBuilder withThrottledMillis(long throttledMillis){ + this.throttledMillis = throttledMillis; + return this; + } + + public PostReindexResponseBuilder withRequestsPerSecond(double requestsPerSecond){ + this.requestsPerSecond = requestsPerSecond; + return this; + } + + public PostReindexResponseBuilder withThrottledUntilMillis(long throttledUntilMillis){ + this.throttledUntilMillis = throttledUntilMillis; + return this; + } + + public PostReindexResponseBuilder withFailures(List failures) { + this.failures = failures; + return this; + } + + public PostReindexResponse build() { + return new PostReindexResponse(took, timedOut, total, updated, deleted, batches, versionConflicts, noops, bulkRetries, + searchRetries, throttledMillis, requestsPerSecond, throttledUntilMillis, failures); + } + } +} diff --git a/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java new file mode 100644 index 000000000..320d1042d --- /dev/null +++ b/src/main/java/org/springframework/data/elasticsearch/core/index/reindex/Remote.java @@ -0,0 +1,126 @@ +package org.springframework.data.elasticsearch.core.index.reindex; + +import org.elasticsearch.index.reindex.RemoteInfo; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import java.time.Duration; + +/** + * Remote info {@link RemoteInfo} + * + * @author Onizuka + */ +public class Remote { + private final String scheme; + private final String host; + private final int port; + + @Nullable private final String pathPrefix; + @Nullable private final String username; + @Nullable private final String password; + @Nullable private final Duration socketTimeout; + @Nullable private final Duration connectTimeout; + + private Remote(String scheme, String host, int port, @Nullable String pathPrefix, @Nullable String username, @Nullable String password, @Nullable Duration socketTimeout, @Nullable Duration connectTimeout) { + + Assert.notNull(scheme, "scheme must not be null"); + Assert.notNull(host, "host must not be null"); + + this.scheme = scheme; + this.host = host; + this.port = port; + this.pathPrefix = pathPrefix; + this.username = username; + this.password = password; + this.socketTimeout = socketTimeout; + this.connectTimeout = connectTimeout; + } + + public String getHost() { + return host; + } + + @Nullable + public String getUsername() { + return username; + } + + @Nullable + public String getPassword() { + return password; + } + + @Nullable + public Duration getSocketTimeout() { + return socketTimeout; + } + + @Nullable + public Duration getConnectTimeout() { + return connectTimeout; + } + + public String getScheme() { + return scheme; + } + + public int getPort() { + return port; + } + + @Nullable + public String getPathPrefix() { + return pathPrefix; + } + + public static RemoteBuilder builder(String scheme, String host, int port){ + return new RemoteBuilder(scheme, host, port); + } + + public static class RemoteBuilder{ + private final String scheme; + private final String host; + private final int port; + @Nullable private String pathPrefix; + @Nullable private String username; + @Nullable private String password; + @Nullable private Duration socketTimeout; + @Nullable private Duration connectTimeout; + + public RemoteBuilder(String scheme, String host, int port) { + this.scheme = scheme; + this.host = host; + this.port = port; + } + + public RemoteBuilder withPathPrefix(String pathPrefix){ + this.pathPrefix = pathPrefix; + return this; + } + + public RemoteBuilder withUsername(String username){ + this.username = username; + return this; + } + + public RemoteBuilder withPassword(String password){ + this.password = password; + return this; + } + + public RemoteBuilder withSocketTimeout(Duration socketTimeout){ + this.socketTimeout = socketTimeout; + return this; + } + + public RemoteBuilder withConnectTimeout(Duration connectTimeout){ + this.connectTimeout = connectTimeout; + return this; + } + + public Remote build(){ + return new Remote(scheme, host, port , pathPrefix, username, password, socketTimeout, connectTimeout); + } + } +} diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index d75218eff..82ebfd958 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -83,6 +83,8 @@ import org.springframework.data.elasticsearch.annotations.MultiField; import org.springframework.data.elasticsearch.annotations.ScriptedField; import org.springframework.data.elasticsearch.annotations.Setting; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import org.springframework.data.elasticsearch.core.query.NativeSearchQuery; import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; import org.springframework.data.elasticsearch.core.query.ScriptField; @@ -126,6 +128,7 @@ * @author Subhobrata Dey * @author Farid Faoudi * @author Peer Mueller + * @author Sijia Liu */ @SpringIntegrationTest public abstract class ElasticsearchTemplateTests { @@ -3646,6 +3649,34 @@ void shouldNotErrorOnSortWithUnmappedFieldAndUnmappedTypeSettings() { operations.search(query, SampleEntity.class); } + @Test // #1529 + void shouldWorkReindexForExistingIndex() { + String sourceIndexName = indexNameProvider.indexName(); + String documentId = nextIdAsString(); + SampleEntity sampleEntity = SampleEntity.builder().id(documentId).message("abc").build(); + operations.save(sampleEntity); + + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + + final PostReindexResponse reindex = operations.reindex(PostReindexRequest.builder(sourceIndexName, destIndexName).withRefresh(true).build()); + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + assertThat(reindex.getTotal()).isEqualTo(1); + assertThat(operations.count(searchQuery, IndexCoordinates.of(destIndexName))).isEqualTo(1); + } + + @Test // #1529 + void shouldWorkSubmitReindexTask(){ + String sourceIndexName = indexNameProvider.indexName(); + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + String task = operations.submitReindexTask(PostReindexRequest.builder(sourceIndexName, destIndexName).build()); + // Maybe there should be a task api to detect whether the task exists + assertThat(task).isNotBlank(); + } + // region entities @Document(indexName = "#{@indexNameProvider.indexName()}") @Setting(shards = 1, replicas = 0, refreshInterval = "-1") diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java index 0b34f9fa6..e31a61595 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplateIntegrationTests.java @@ -19,7 +19,11 @@ import static org.assertj.core.api.Assertions.*; import static org.elasticsearch.index.query.QueryBuilders.*; import static org.springframework.data.elasticsearch.annotations.FieldType.*; +import static org.springframework.data.elasticsearch.utils.IdGenerator.*; +import org.elasticsearch.common.Strings; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -1188,6 +1192,38 @@ void shouldReturnMonoOfReactiveSearchHits() { .verifyComplete(); } + @Test // #1529 + void shouldWorkReindexForExistingIndex() { + String sourceIndexName = indexNameProvider.indexName(); + String documentId = nextIdAsString(); + ElasticsearchTemplateTests.SampleEntity sampleEntity = ElasticsearchTemplateTests.SampleEntity.builder().id(documentId).message("abc").build(); + operations.save(sampleEntity).block(); + + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + operations.reindex(PostReindexRequest.builder(sourceIndexName, destIndexName).withRefresh(true).build()) + .as(StepVerifier::create) + .consumeNextWith(postReindexResponse -> assertThat(postReindexResponse.getTotal()).isEqualTo(1L)) + .verifyComplete(); + NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(matchAllQuery()).build(); + operations.count(searchQuery, SampleEntity.class, IndexCoordinates.of(destIndexName)) + .as(StepVerifier::create) + .expectNext(1L) + .verifyComplete(); + } + + @Test // #1529 + void shouldWorkSubmitReindexTask(){ + String sourceIndexName = indexNameProvider.indexName(); + indexNameProvider.increment(); + String destIndexName = indexNameProvider.indexName(); + operations.indexOps(IndexCoordinates.of(destIndexName)).create(); + operations.submitReindexTask(PostReindexRequest.builder(sourceIndexName, destIndexName).build()) + .as(StepVerifier::create) + .consumeNextWith(task -> assertThat(task).isNotBlank()) + .verifyComplete(); + } // endregion // region Helper functions diff --git a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java index f513abc97..c093efe27 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/RequestFactoryTests.java @@ -36,6 +36,7 @@ import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder; import org.elasticsearch.index.query.functionscore.FunctionScoreQueryBuilder.FilterFunctionBuilder; import org.elasticsearch.index.query.functionscore.GaussDecayFunctionBuilder; +import org.elasticsearch.index.reindex.ReindexRequest; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentType; import org.json.JSONException; @@ -48,6 +49,7 @@ import org.springframework.data.annotation.Id; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; import org.springframework.data.elasticsearch.core.geo.GeoPoint; @@ -55,18 +57,12 @@ import org.springframework.data.elasticsearch.core.index.AliasActionParameters; import org.springframework.data.elasticsearch.core.index.AliasActions; import org.springframework.data.elasticsearch.core.index.PutTemplateRequest; +import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest; +import org.springframework.data.elasticsearch.core.index.reindex.Remote; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; -import org.springframework.data.elasticsearch.core.query.Criteria; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.GeoDistanceOrder; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.IndexQueryBuilder; -import org.springframework.data.elasticsearch.core.query.NativeSearchQueryBuilder; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.RescorerQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.query.RescorerQuery.ScoreMode; -import org.springframework.data.elasticsearch.core.query.SeqNoPrimaryTerm; import org.springframework.lang.Nullable; /** @@ -74,6 +70,7 @@ * @author Roman Puchkovskiy * @author Peer Mueller * @author vdisk + * @author Sijia Liu */ @SuppressWarnings("ConstantConditions") @ExtendWith(MockitoExtension.class) @@ -562,6 +559,64 @@ void shouldSetStoredFieldsOnSearchRequest() { .isEqualTo(Arrays.asList("last-name", "current-location")); } + @Test // #1529 + void shouldCreatePostReindexRequest() throws IOException, JSONException { + final String expected = "{\n" + + " \"source\":{\n" + + " \"remote\":{\n" + + " \"username\":\"admin\",\n" + + " \"password\":\"admin\",\n" + + " \"host\":\"http://localhost:9200/elasticsearch\",\n" + + " \"socket_timeout\":\"30s\",\n" + + " \"connect_timeout\":\"30s\"\n" + + " },\n" + + " \"index\":[\"source_1\",\"source_2\"],\n" + + " \"size\":5,\n" + + " \"query\":{\"match_all\":{\"boost\":1.0}},\n" + + " \"_source\":{\"includes\":[\"name\"],\"excludes\":[]},\n" + + " \"slice\":{\"id\":1,\"max\":20}\n" + + " },\n" + + " \"dest\":{\n" + + " \"index\":\"destination\",\n" + + " \"routing\":\"routing\",\n" + + " \"op_type\":\"create\",\n" + + " \"pipeline\":\"pipeline\",\n" + + " \"version_type\":\"external\"\n" + + " },\n" + + " \"max_docs\":10,\n" + + " \"script\":{\"source\":\"Math.max(1,2)\",\"lang\":\"java\"},\n" + + " \"conflicts\":\"proceed\"\n" + + "}"; + + Remote remote = Remote.builder("http", "localhost",9200) + .withPathPrefix("elasticsearch") + .withUsername("admin") + .withPassword("admin") + .withConnectTimeout(Duration.ofSeconds(30)) + .withSocketTimeout(Duration.ofSeconds(30)).build(); + + PostReindexRequest postReindexRequest = PostReindexRequest.builder("source_1", "destination") + .addSourceIndex("source_2") + .withConflicts("proceed") + .withMaxDocs(10) + .withSourceQuery(QueryBuilders.matchAllQuery()) + .withSourceSize(5) + .withSourceSourceFilter(new FetchSourceFilterBuilder().withIncludes("name").build()) + .withSourceRemote(remote) + .withSourceSlice(1,20) + .withDestOpType(IndexQuery.OpType.CREATE) + .withDestVersionType(Document.VersionType.EXTERNAL) + .withDestPipeline("pipeline") + .withDestRouting("routing") + .withScript("Math.max(1,2)", "java") + .build(); + + final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest); + final String json = requestToString(reindexRequest); + + assertEquals(expected, json, false); + } + // region entities static class Person { @Nullable