Skip to content

Commit c5db583

Browse files
authored
Implement support for reindex API.
Original Pull Request #2070 Closes #1529
1 parent cf3e46b commit c5db583

17 files changed

+1479
-20
lines changed

Diff for: src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+16
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,13 @@
8383
import org.elasticsearch.client.GetAliasesResponse;
8484
import org.elasticsearch.client.Request;
8585
import org.elasticsearch.client.indices.*;
86+
import org.elasticsearch.client.tasks.TaskSubmissionResponse;
8687
import org.elasticsearch.core.TimeValue;
8788
import org.elasticsearch.index.get.GetResult;
8889
import org.elasticsearch.index.reindex.BulkByScrollResponse;
8990
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
9091
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
92+
import org.elasticsearch.index.reindex.ReindexRequest;
9193
import org.elasticsearch.rest.BytesRestResponse;
9294
import org.elasticsearch.rest.RestStatus;
9395
import org.elasticsearch.script.mustache.SearchTemplateRequest;
@@ -143,6 +145,7 @@
143145
* @author Brian Clozel
144146
* @author Farid Faoudi
145147
* @author George Popides
148+
* @author Sijia Liu
146149
* @since 3.2
147150
* @see ClientConfiguration
148151
* @see ReactiveRestClients
@@ -509,6 +512,19 @@ public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
509512
.next();
510513
}
511514

515+
@Override
516+
public Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest) {
517+
return sendRequest(reindexRequest, requestCreator.reindex(), BulkByScrollResponse.class, headers)
518+
.next();
519+
}
520+
521+
@Override
522+
public Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest) {
523+
return sendRequest(reindexRequest, requestCreator.submitReindex(), TaskSubmissionResponse.class, headers)
524+
.next()
525+
.map(TaskSubmissionResponse::getTask);
526+
}
527+
512528
@Override
513529
public <T> Mono<T> execute(ReactiveElasticsearchClientCallback<T> callback) {
514530

Diff for: src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+71
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.index.reindex.BulkByScrollResponse;
5555
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
5656
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
57+
import org.elasticsearch.index.reindex.ReindexRequest;
5758
import org.elasticsearch.script.mustache.SearchTemplateRequest;
5859
import org.elasticsearch.search.SearchHit;
5960
import org.elasticsearch.search.aggregations.Aggregation;
@@ -76,6 +77,7 @@
7677
* @author Henrique Amaral
7778
* @author Thomas Geese
7879
* @author Farid Faoudi
80+
* @author Sijia Liu
7981
* @since 3.2
8082
* @see ClientConfiguration
8183
* @see ReactiveRestClients
@@ -713,6 +715,75 @@ default Mono<BulkResponse> bulk(BulkRequest bulkRequest) {
713715
*/
714716
Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest);
715717

718+
/**
719+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
720+
*
721+
* @param consumer must not be {@literal null}
722+
* @return the {@link Mono} emitting the response
723+
* @since 4.4
724+
*/
725+
default Mono<BulkByScrollResponse> reindex(Consumer<ReindexRequest> consumer){
726+
727+
ReindexRequest reindexRequest = new ReindexRequest();
728+
consumer.accept(reindexRequest);
729+
return reindex(reindexRequest);
730+
}
731+
732+
/**
733+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
734+
*
735+
* @param reindexRequest must not be {@literal null}
736+
* @return the {@link Mono} emitting the response
737+
* @since 4.4
738+
*/
739+
default Mono<BulkByScrollResponse> reindex(ReindexRequest reindexRequest){
740+
return reindex(HttpHeaders.EMPTY, reindexRequest);
741+
}
742+
743+
/**
744+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
745+
*
746+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
747+
* @param reindexRequest must not be {@literal null}
748+
* @return the {@link Mono} emitting the response
749+
* @since 4.4
750+
*/
751+
Mono<BulkByScrollResponse> reindex(HttpHeaders headers, ReindexRequest reindexRequest);
752+
753+
/**
754+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
755+
*
756+
* @param consumer must not be {@literal null}
757+
* @return the {@link Mono} emitting the task id
758+
* @since 4.4
759+
*/
760+
default Mono<String> submitReindex(Consumer<ReindexRequest> consumer){
761+
762+
ReindexRequest reindexRequest = new ReindexRequest();
763+
consumer.accept(reindexRequest);
764+
return submitReindex(reindexRequest);
765+
}
766+
767+
/**
768+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
769+
*
770+
* @param reindexRequest must not be {@literal null}
771+
* @return the {@link Mono} emitting the task id
772+
* @since 4.4
773+
*/
774+
default Mono<String> submitReindex(ReindexRequest reindexRequest){
775+
return submitReindex(HttpHeaders.EMPTY, reindexRequest);
776+
}
777+
778+
/**
779+
* Execute the given {@link ReindexRequest} against the {@literal reindex} API.
780+
*
781+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
782+
* @param reindexRequest must not be {@literal null}
783+
* @return the {@link Mono} emitting the task id
784+
* @since 4.4
785+
*/
786+
Mono<String> submitReindex(HttpHeaders headers, ReindexRequest reindexRequest);
716787
/**
717788
* Compose the actual command/s to run against Elasticsearch using the underlying {@link WebClient connection}.
718789
* {@link #execute(ReactiveElasticsearchClientCallback) Execute} selects an active server from the available ones and

Diff for: src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

+11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
5050
import org.elasticsearch.client.indices.PutMappingRequest;
5151
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
52+
import org.elasticsearch.index.reindex.ReindexRequest;
5253
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
5354
import org.elasticsearch.script.mustache.SearchTemplateRequest;
5455
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
@@ -289,4 +290,14 @@ default Function<GetIndexRequest, Request> getIndex() {
289290
default Function<ClusterHealthRequest, Request> clusterHealth() {
290291
return RequestConverters::clusterHealth;
291292
}
293+
294+
/**
295+
* @since 4.4
296+
*/
297+
default Function<ReindexRequest, Request> reindex() { return RequestConverters::reindex; }
298+
299+
/**
300+
* @since 4.4
301+
*/
302+
default Function<ReindexRequest, Request> submitReindex() { return RequestConverters::submitReindex; }
292303
}

Diff for: src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,11 @@ public static Request rankEval(RankEvalRequest rankEvalRequest) {
532532
return request;
533533
}
534534

535-
public static Request reindex(ReindexRequest reindexRequest) throws IOException {
535+
public static Request reindex(ReindexRequest reindexRequest) {
536536
return prepareReindexRequest(reindexRequest, true);
537537
}
538538

539-
static Request submitReindex(ReindexRequest reindexRequest) throws IOException {
539+
public static Request submitReindex(ReindexRequest reindexRequest) {
540540
return prepareReindexRequest(reindexRequest, false);
541541
}
542542

@@ -547,9 +547,16 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
547547
.withTimeout(reindexRequest.getTimeout()).withWaitForActiveShards(reindexRequest.getWaitForActiveShards())
548548
.withRequestsPerSecond(reindexRequest.getRequestsPerSecond());
549549

550+
if(reindexRequest.getDestination().isRequireAlias()){
551+
params.putParam("require_alias", Boolean.TRUE.toString());
552+
}
550553
if (reindexRequest.getScrollTime() != null) {
551554
params.putParam("scroll", reindexRequest.getScrollTime());
552555
}
556+
params.putParam("slices", Integer.toString(reindexRequest.getSlices()));
557+
if(reindexRequest.getMaxDocs() > -1){
558+
params.putParam("max_docs", Integer.toString(reindexRequest.getMaxDocs()));
559+
}
553560
request.setEntity(createEntity(reindexRequest, REQUEST_BODY_CONTENT_TYPE));
554561
return request;
555562
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

+25
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import java.util.Collection;
1919
import java.util.List;
2020

21+
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
22+
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
2123
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
2224
import org.springframework.data.elasticsearch.core.query.BulkOptions;
2325
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
@@ -34,6 +36,7 @@
3436
*
3537
* @author Peter-Josef Meisch
3638
* @author Farid Faoudi
39+
* @author Sijia Liu
3740
* @since 4.0
3841
*/
3942
public interface DocumentOperations {
@@ -322,4 +325,26 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
322325
* @since 4.2
323326
*/
324327
ByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
328+
329+
/**
330+
* Copies documents from a source to a destination.
331+
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
332+
* For example, you cannot reindex a data stream into itself.
333+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
334+
*
335+
* @param reindexRequest reindex request parameters
336+
* @return the reindex response
337+
* @since 4.4
338+
*/
339+
ReindexResponse reindex(ReindexRequest reindexRequest);
340+
341+
/**
342+
* Submits a reindex task.
343+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
344+
*
345+
* @param reindexRequest reindex request parameters
346+
* @return the task id
347+
* @since 4.4
348+
*/
349+
String submitReindex(ReindexRequest reindexRequest);
325350
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+23
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@
6161
import org.springframework.data.elasticsearch.core.cluster.ElasticsearchClusterOperations;
6262
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
6363
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
64+
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
6465
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
66+
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
6567
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
6668
import org.springframework.data.elasticsearch.core.query.BulkOptions;
6769
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
@@ -106,6 +108,7 @@
106108
* @author Gyula Attila Csorogi
107109
* @author Massimiliano Poggi
108110
* @author Farid Faoudi
111+
* @author Sijia Liu
109112
* @since 4.4
110113
*/
111114
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
@@ -277,6 +280,26 @@ public ByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index)
277280
return ResponseConverter.byQueryResponseOf(bulkByScrollResponse);
278281
}
279282

283+
@Override
284+
public ReindexResponse reindex(ReindexRequest postReindexRequest) {
285+
286+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
287+
288+
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
289+
final BulkByScrollResponse bulkByScrollResponse = execute(
290+
client -> client.reindex(reindexRequest, RequestOptions.DEFAULT));
291+
return ResponseConverter.reindexResponseOf(bulkByScrollResponse);
292+
}
293+
294+
@Override
295+
public String submitReindex(ReindexRequest postReindexRequest) {
296+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
297+
298+
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
299+
return execute(
300+
client -> client.submitReindexTask(reindexRequest, RequestOptions.DEFAULT).getTask());
301+
}
302+
280303
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
281304
IndexCoordinates index) {
282305
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java

+25
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
19+
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
1820
import reactor.core.publisher.Flux;
1921
import reactor.core.publisher.Mono;
2022

@@ -38,6 +40,7 @@
3840
* @author Aleksei Arsenev
3941
* @author Roman Puchkovskiy
4042
* @author Farid Faoudi
43+
* @author Sijia Liu
4144
* @since 4.0
4245
*/
4346
public interface ReactiveDocumentOperations {
@@ -302,4 +305,26 @@ default Mono<Void> bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index)
302305
* @since 4.2
303306
*/
304307
Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
308+
309+
/**
310+
* Copies documents from a source to a destination.
311+
* The source can be any existing index, alias, or data stream. The destination must differ from the source.
312+
* For example, you cannot reindex a data stream into itself.
313+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
314+
*
315+
* @param reindexRequest reindex request parameters
316+
* @return a {@link Mono} emitting the reindex response
317+
* @since 4.4
318+
*/
319+
Mono<ReindexResponse> reindex(ReindexRequest reindexRequest);
320+
321+
/**
322+
* Submits a reindex task.
323+
* (@see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html)
324+
*
325+
* @param reindexRequest reindex request parameters
326+
* @return a {@link Mono} emitting the {@literal task} id.
327+
* @since 4.4
328+
*/
329+
Mono<String> submitReindex(ReindexRequest reindexRequest);
305330
}

Diff for: src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+25
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@
7070
import org.springframework.data.elasticsearch.core.event.ReactiveAfterLoadCallback;
7171
import org.springframework.data.elasticsearch.core.event.ReactiveAfterSaveCallback;
7272
import org.springframework.data.elasticsearch.core.event.ReactiveBeforeConvertCallback;
73+
import org.springframework.data.elasticsearch.core.reindex.ReindexRequest;
74+
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
7375
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
7476
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty;
7577
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
@@ -105,6 +107,7 @@
105107
* @author Russell Parry
106108
* @author Thomas Geese
107109
* @author Farid Faoudi
110+
* @author Sijia Liu
108111
* @since 3.2
109112
*/
110113
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
@@ -609,6 +612,28 @@ public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordin
609612
});
610613
}
611614

615+
@Override
616+
public Mono<ReindexResponse> reindex(ReindexRequest postReindexRequest) {
617+
618+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
619+
620+
return Mono.defer(() -> {
621+
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
622+
return Mono.from(execute(client -> client.reindex(reindexRequest))).map(ResponseConverter::reindexResponseOf);
623+
});
624+
}
625+
626+
@Override
627+
public Mono<String> submitReindex(ReindexRequest postReindexRequest) {
628+
629+
Assert.notNull(postReindexRequest, "postReindexRequest must not be null");
630+
631+
return Mono.defer(() -> {
632+
final org.elasticsearch.index.reindex.ReindexRequest reindexRequest = requestFactory.reindexRequest(postReindexRequest);
633+
return Mono.from(execute(client -> client.submitReindex(reindexRequest)));
634+
});
635+
}
636+
612637
@Override
613638
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType) {
614639
return delete(query, entityType, getIndexCoordinatesFor(entityType));

0 commit comments

Comments
 (0)