Skip to content

Commit 84eac20

Browse files
committed
Implement support for reindex API [DATAES-955] spring-projects#1529
1 parent dc6d7a0 commit 84eac20

16 files changed

+1373
-20
lines changed

Diff for: src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+16
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@
8383
import org.elasticsearch.client.GetAliasesResponse;
8484
import org.elasticsearch.client.Request;
8585
import org.elasticsearch.client.indices.*;
86+
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
8687
import org.elasticsearch.core.TimeValue;
8788
import org.elasticsearch.index.get.GetResult;
8889
import org.elasticsearch.index.reindex.BulkByScrollResponse;
8990
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
9091
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
92+
import org.elasticsearch.index.reindex.ReindexRequest;
9193
import org.elasticsearch.rest.BytesRestResponse;
9294
import org.elasticsearch.rest.RestStatus;
9395
import org.elasticsearch.script.mustache.SearchTemplateRequest;
@@ -143,6 +145,7 @@
143145
* @author Brian Clozel
144146
* @author Farid Faoudi
145147
* @author George Popides
148+
* @author Sijia Liu
146149
* @since 3.2
147150
* @see ClientConfiguration
148151
* @see ReactiveRestClients
@@ -509,6 +512,19 @@ public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
509512
.next();
510513
}
511514

515+
@Override
516+
public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) {
517+
return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers)
518+
.next();
519+
}
520+
521+
@Override
522+
public Mono<String> submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest) {
523+
return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers)
524+
.next()
525+
.map(TaskSubmissionResponse::getTask);
526+
}
527+
512528
@Override
513529
public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
514530

Diff for: src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+65
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.index.reindex.BulkByScrollResponse;
5555
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
5656
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
57+
import org.elasticsearch.index.reindex.ReindexRequest;
5758
import org.elasticsearch.script.mustache.SearchTemplateRequest;
5859
import org.elasticsearch.search.SearchHit;
5960
import org.elasticsearch.search.aggregations.Aggregation;
@@ -76,6 +77,7 @@
7677
* @author Henrique Amaral
7778
* @author Thomas Geese
7879
* @author Farid Faoudi
80+
* @author Sijia Liu
7981
* @since 3.2
8082
* @see ClientConfiguration
8183
* @see ReactiveRestClients
@@ -713,6 +715,69 @@ default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
713715
*/
714716
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);
715717

718+
/**
719+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
720+
*
721+
* @param consumer must not be {@literal null}
722+
* @return the {@link Mono} emitting the response
723+
*/
724+
default Mono<BulkByScrollResponse> reindex(Consumer<ReindexRequest> consumer){
725+
726+
ReindexRequest reindexRequest = new ReindexRequest();
727+
consumer.accept(reindexRequest);
728+
return reindex(reindexRequest);
729+
}
730+
731+
/**
732+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
733+
*
734+
* @param reindexRequest must not be {@literal null}
735+
* @return the {@link Mono} emitting the response
736+
*/
737+
default Mono<BulkByScrollResponse> reindex(ReindexRequest reindexRequest){
738+
return reindex(HttpHeaders.EMPTY, reindexRequest);
739+
}
740+
741+
/**
742+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
743+
*
744+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
745+
* @param reindexRequest must not be {@literal null}
746+
* @return the {@link Mono} emitting the response
747+
*/
748+
Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest);
749+
750+
/**
751+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
752+
*
753+
* @param consumer must not be {@literal null}
754+
* @return the {@link Mono} emitting the task
755+
*/
756+
default Mono<String> submitReindexTask(Consumer<ReindexRequest> consumer){
757+
758+
ReindexRequest reindexRequest = new ReindexRequest();
759+
consumer.accept(reindexRequest);
760+
return submitReindexTask(reindexRequest);
761+
}
762+
763+
/**
764+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
765+
*
766+
* @param reindexRequest must not be {@literal null}
767+
* @return the {@link Mono} emitting the task
768+
*/
769+
default Mono<String> submitReindexTask(ReindexRequest reindexRequest){
770+
return submitReindexTask(HttpHeaders.EMPTY, reindexRequest);
771+
}
772+
773+
/**
774+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
775+
*
776+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
777+
* @param reindexRequest must not be {@literal null}
778+
* @return the {@link Mono} emitting the task
779+
*/
780+
Mono<String> submitReindexTask(HttpHeaders headers, ReindexRequest reindexRequest);
716781
/**
717782
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
718783
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and

Diff for: src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

+5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
5050
import org.elasticsearch.client.indices.PutMappingRequest;
5151
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
52+
import org.elasticsearch.index.reindex.ReindexRequest;
5253
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5354
import org.elasticsearch.script.mustache.SearchTemplateRequest;
5455
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
@@ -289,4 +290,8 @@ default Function<GetIndexRequest, Request> getIndex() {
289290
default Function<ClusterHealthRequest, Request> clusterHealth() {
290291
return RequestConverters::clusterHealth;
291292
}
293+
294+
default Function<ReindexRequest, Request> reindex() { return RequestConverters::reindex; }
295+
296+
default Function<ReindexRequest, Request> submitReindex() { return RequestConverters::submitReindex; }
292297
}

Diff for: src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) {
532532
return request;
533533
}
534534

535-
public static Request reindex(ReindexRequest reindexRequest) throws IOException {
535+
public static Request reindex(ReindexRequest reindexRequest) {
536536
return prepareReindexRequest(reindexRequest, true);
537537
}
538538

539-
static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
539+
public static Request submitReindex(ReindexRequest reindexRequest) {
540540
return prepareReindexRequest(reindexRequest, false);
541541
}
542542

@@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
547547
.withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
548548
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond());
549549

550+
if(reindexRequest.getDestination().isRequireAlias()){
551+
params.putParam("require_alias", Boolean.TRUE.toString());
552+
}
550553
if (reindexRequest.getScrollTime() != null) {
551554
params.putParam("scroll", reindexRequest.getScrollTime());
552555
}
556+
params.putParam("slices", Integer.toString(reindexRequest.getSlices()));
557+
if(reindexRequest.getMaxDocs() > -1){
558+
params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs()));
559+
}
553560
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
554561
return request;
555562
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

+23
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.util.Collection;
1919
import java.util.List;
2020

21+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
22+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
2123
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
2224
import org.springframework.data.elasticsearch.core.query.BulkOptions;
2325
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
@@ -34,6 +36,7 @@
3436
*
3537
* @author Peter-Josef Meisch
3638
* @author Farid Faoudi
39+
* @author Sijia Liu
3740
* @since 4.0
3841
*/
3942
public interface DocumentOperations {
@@ -322,4 +325,24 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
322325
* @since 4.2
323326
*/
324327
ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
328+
329+
/**
330+
* Copies documents from a source to a destination.
331+
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
332+
* For example, you cannot reindex a data stream into itself.
333+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
334+
*
335+
* @param postReindexRequest reindex request parameters
336+
* @return the reindex response
337+
*/
338+
PostReindexResponse reindex(PostReindexRequest postReindexRequest);
339+
340+
/**
341+
* Submits a reindex task.
342+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
343+
*
344+
* @param postReindexRequest reindex request parameters
345+
* @return the task
346+
*/
347+
String submitReindexTask(PostReindexRequest postReindexRequest);
325348
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+24
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import org.elasticsearch.index.query.QueryBuilders;
5454
import org.elasticsearch.index.reindex.BulkByScrollResponse;
5555
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
56+
import org.elasticsearch.index.reindex.ReindexRequest;
5657
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5758
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
5859
import org.elasticsearch.search.suggest.SuggestBuilder;
@@ -61,7 +62,9 @@
6162
import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations;
6263
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
6364
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
65+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
6466
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
67+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
6568
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
6669
import org.springframework.data.elasticsearch.core.query.BulkOptions;
6770
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
@@ -106,6 +109,7 @@
106109
* @author Gyula Attila Csorogi
107110
* @author Massimiliano Poggi
108111
* @author Farid Faoudi
112+
* @author Sijia Liu
109113
* @since 4.4
110114
*/
111115
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
@@ -277,6 +281,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index)
277281
return ResponseConverter.byQueryResponseOf(bulkByScrollResponse);
278282
}
279283

284+
@Override
285+
public PostReindexResponse reindex(PostReindexRequest postReindexRequest) {
286+
287+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
288+
289+
final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
290+
final BulkByScrollResponse bulkByScrollResponse = execute(
291+
client -> client.reindex(reindexRequest, RequestOptions.DEFAULT));
292+
return ResponseConverter.postReindexResponseOf(bulkByScrollResponse);
293+
}
294+
295+
@Override
296+
public String submitReindexTask(PostReindexRequest postReindexRequest) {
297+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
298+
299+
final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
300+
return execute(
301+
client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask());
302+
}
303+
280304
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
281305
IndexCoordinates index) {
282306
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java

+23
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
19+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
1820
import reactor.core.publisher.Flux;
1921
import reactor.core.publisher.Mono;
2022

@@ -38,6 +40,7 @@
3840
* @author Aleksei Arsenev
3941
* @author Roman Puchkovskiy
4042
* @author Farid Faoudi
43+
* @author Sijia Liu
4144
* @since 4.0
4245
*/
4346
public interface ReactiveDocumentOperations {
@@ -302,4 +305,24 @@ default Mono<Void> bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index)
302305
* @since 4.2
303306
*/
304307
Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
308+
309+
/**
310+
* Copies documents from a source to a destination.
311+
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
312+
* For example, you cannot reindex a data stream into itself.
313+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
314+
*
315+
* @param postReindexRequest reindex request parameters
316+
* @return a {@link Mono} emitting the reindex response
317+
*/
318+
Mono<PostReindexResponse> reindex(PostReindexRequest postReindexRequest);
319+
320+
/**
321+
* Submits a reindex task.
322+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
323+
*
324+
* @param postReindexRequest reindex request parameters
325+
* @return a {@link Mono} emitting the {@literal task}.
326+
*/
327+
Mono<String> submitReindexTask(PostReindexRequest postReindexRequest);
305328
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+26
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.index.query.QueryBuilders;
4747
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4848
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
49+
import org.elasticsearch.index.reindex.ReindexRequest;
4950
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5051
import org.elasticsearch.search.suggest.SuggestBuilder;
5152
import org.reactivestreams.Publisher;
@@ -70,6 +71,8 @@
7071
import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback;
7172
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
7273
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
74+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexRequest;
75+
import org.springframework.data.elasticsearch.core.index.reindex.PostReindexResponse;
7376
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
7477
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
7578
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@@ -105,6 +108,7 @@
105108
* @author Russell Parry
106109
* @author Thomas Geese
107110
* @author Farid Faoudi
111+
* @author Sijia Liu
108112
* @since 3.2
109113
*/
110114
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
@@ -609,6 +613,28 @@ public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordin
609613
});
610614
}
611615

616+
@Override
617+
public Mono<PostReindexResponse> reindex(PostReindexRequest postReindexRequest) {
618+
619+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
620+
621+
return Mono.defer(() -> {
622+
final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
623+
return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::postReindexResponseOf);
624+
});
625+
}
626+
627+
@Override
628+
public Mono<String> submitReindexTask(PostReindexRequest postReindexRequest) {
629+
630+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
631+
632+
return Mono.defer(() -> {
633+
final ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
634+
return Mono.from(execute(client -> client.submitReindexTask(reindexRequest)));
635+
});
636+
}
637+
612638
@Override
613639
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
614640
return delete(query, entityType, getIndexCoordinatesFor(entityType));

0 commit comments

Comments
 (0)