Skip to content

Commit 0ee0164

Browse files
author
Farid Faoudi
authored
Implement update by query.
Original Pull Request spring-projects#1446 Closes spring-projects#1644
1 parent 169015f commit 0ee0164

17 files changed

+1119
-38
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+9
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
2525
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
2626
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
27+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
28+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2729
import reactor.core.publisher.Flux;
2830
import reactor.core.publisher.Mono;
2931
import reactor.netty.http.client.HttpClient;
@@ -521,6 +523,13 @@ public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryReq
521523
.next();
522524
}
523525

526+
@Override
527+
public Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest) {
528+
return sendRequest(updateRequest, requestCreator.updateByQuery(), BulkByScrollResponse.class, headers) //
529+
.next() //
530+
.map(UpdateByQueryResponse::of);
531+
}
532+
524533
/*
525534
* (non-Javadoc)
526535
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+40
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
1919
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
20+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
21+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2022
import reactor.core.publisher.Flux;
2123
import reactor.core.publisher.Mono;
2224

@@ -596,6 +598,44 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
596598
*/
597599
Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest);
598600

601+
/**
602+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
603+
*
604+
* @param consumer never {@literal null}.
605+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
606+
* * Query API on elastic.co</a>
607+
* @return a {@link Mono} emitting operation response.
608+
*/
609+
default Mono<UpdateByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> consumer){
610+
611+
final UpdateByQueryRequest request = new UpdateByQueryRequest();
612+
consumer.accept(request);
613+
return updateBy(request);
614+
}
615+
616+
/**
617+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
618+
*
619+
* @param updateRequest must not be {@literal null}.
620+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
621+
* * Query API on elastic.co</a>
622+
* @return a {@link Mono} emitting operation response.
623+
*/
624+
default Mono<UpdateByQueryResponse> updateBy(UpdateByQueryRequest updateRequest){
625+
return updateBy(HttpHeaders.EMPTY, updateRequest);
626+
}
627+
628+
/**
629+
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
630+
*
631+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
632+
* @param updateRequest must not be {@literal null}.
633+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
634+
* * Query API on elastic.co</a>
635+
* @return a {@link Mono} emitting operation response.
636+
*/
637+
Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);
638+
599639
/**
600640
* Execute a {@link BulkRequest} against the {@literal bulk} API.
601641
*

src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
3434
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
3535
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
36+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
3637
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
3738
import org.springframework.data.elasticsearch.client.util.RequestConverters;
3839

@@ -91,6 +92,10 @@ default Function<DeleteByQueryRequest, Request> deleteByQuery() {
9192
return RequestConverters::deleteByQuery;
9293
}
9394

95+
default Function<UpdateByQueryRequest, Request> updateByQuery() {
96+
return RequestConverters::updateByQuery;
97+
}
98+
9499
default Function<BulkRequest, Request> bulk() {
95100

96101
return request -> {

src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -540,12 +540,15 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
540540
public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
541541
String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
542542
Request request = new Request(HttpMethod.POST.name(), endpoint);
543-
Params params = new Params(request).withRouting(updateByQueryRequest.getRouting())
544-
.withPipeline(updateByQueryRequest.getPipeline()).withRefresh(updateByQueryRequest.isRefresh())
545-
.withTimeout(updateByQueryRequest.getTimeout())
546-
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards())
547-
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond())
548-
.withIndicesOptions(updateByQueryRequest.indicesOptions());
543+
Params params = new Params(request)
544+
.withRouting(updateByQueryRequest.getRouting()) //
545+
.withPipeline(updateByQueryRequest.getPipeline()) //
546+
.withRefresh(updateByQueryRequest.isRefresh()) //
547+
.withTimeout(updateByQueryRequest.getTimeout()) //
548+
.withWaitForActiveShards(updateByQueryRequest.getWaitForActiveShards()) //
549+
.withRequestsPerSecond(updateByQueryRequest.getRequestsPerSecond()) //
550+
.withIndicesOptions(updateByQueryRequest.indicesOptions()); //
551+
549552
if (!updateByQueryRequest.isAbortOnVersionConflict()) {
550553
params.putParam("conflicts", "proceed");
551554
}
@@ -555,8 +558,8 @@ public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
555558
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
556559
params.putParam("scroll", updateByQueryRequest.getScrollTime());
557560
}
558-
if (updateByQueryRequest.getSize() > 0) {
559-
params.putParam("size", Integer.toString(updateByQueryRequest.getSize()));
561+
if (updateByQueryRequest.getMaxDocs() > 0) {
562+
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
560563
}
561564
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
562565
return request;

src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

+12
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.List;
1919

2020
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
21+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2122
import org.springframework.data.elasticsearch.core.query.BulkOptions;
2223
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
2324
import org.springframework.data.elasticsearch.core.query.GetQuery;
@@ -33,6 +34,7 @@
3334
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs.html">Elasticsearch Document APIs</a>.
3435
*
3536
* @author Peter-Josef Meisch
37+
* @author Farid Faoudi
3638
* @since 4.0
3739
*/
3840
public interface DocumentOperations {
@@ -299,6 +301,16 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
299301
*/
300302
UpdateResponse update(UpdateQuery updateQuery, IndexCoordinates index);
301303

304+
/**
305+
* Update document(s) by query
306+
*
307+
* @param updateQuery query defining the update
308+
* @param index the index where to update the records
309+
* @return the update response
310+
* @since 4.2
311+
*/
312+
UpdateByQueryResponse updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
313+
302314
// region deprecated
303315
/**
304316
* Delete all records matching the query.

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+11
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737
import org.elasticsearch.client.RequestOptions;
3838
import org.elasticsearch.client.RestHighLevelClient;
3939
import org.elasticsearch.common.unit.TimeValue;
40+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
4041
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
42+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
4143
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
4244
import org.elasticsearch.search.suggest.SuggestBuilder;
4345
import org.slf4j.Logger;
@@ -46,6 +48,7 @@
4648
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4749
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
4850
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
51+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
4952
import org.springframework.data.elasticsearch.core.query.BulkOptions;
5053
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
5154
import org.springframework.data.elasticsearch.core.query.IndexQuery;
@@ -87,6 +90,7 @@
8790
* @author Mathias Teier
8891
* @author Gyula Attila Csorogi
8992
* @author Massimiliano Poggi
93+
* @author Farid Faoudi
9094
*/
9195
public class ElasticsearchRestTemplate extends AbstractElasticsearchTemplate {
9296

@@ -226,6 +230,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
226230
return new UpdateResponse(result);
227231
}
228232

233+
@Override
234+
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
235+
final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index);
236+
final BulkByScrollResponse bulkByScrollResponse = execute(client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT));
237+
return UpdateByQueryResponse.of(bulkByScrollResponse);
238+
}
239+
229240
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
230241
IndexCoordinates index) {
231242
BulkRequest bulkRequest = prepareWriteRequest(requestFactory.bulkRequest(queries, bulkOptions, index));

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java

+11
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
import org.elasticsearch.action.update.UpdateRequestBuilder;
3737
import org.elasticsearch.client.Client;
3838
import org.elasticsearch.common.unit.TimeValue;
39+
import org.elasticsearch.index.reindex.BulkByScrollResponse;
40+
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
3941
import org.elasticsearch.search.suggest.SuggestBuilder;
4042
import org.slf4j.Logger;
4143
import org.slf4j.LoggerFactory;
@@ -44,6 +46,7 @@
4446
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4547
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
4648
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
49+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
4750
import org.springframework.data.elasticsearch.core.query.BulkOptions;
4851
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
4952
import org.springframework.data.elasticsearch.core.query.IndexQuery;
@@ -81,6 +84,7 @@
8184
* @author Farid Azaza
8285
* @author Gyula Attila Csorogi
8386
* @author Roman Puchkovskiy
87+
* @author Farid Faoudi
8488
* @deprecated as of 4.0
8589
*/
8690
@Deprecated
@@ -251,6 +255,13 @@ public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
251255
return new UpdateResponse(result);
252256
}
253257

258+
@Override
259+
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
260+
final UpdateByQueryRequestBuilder updateByQueryRequestBuilder = requestFactory.updateByQueryRequestBuilder(client, query, index);
261+
final BulkByScrollResponse bulkByScrollResponse = updateByQueryRequestBuilder.execute().actionGet();
262+
return UpdateByQueryResponse.of(bulkByScrollResponse);
263+
}
264+
254265
public List<IndexedObjectInformation> doBulkOperation(List<?> queries, BulkOptions bulkOptions,
255266
IndexCoordinates index) {
256267
BulkRequestBuilder bulkRequestBuilder = requestFactory.bulkRequestBuilder(client, queries, bulkOptions, index);

src/main/java/org/springframework/data/elasticsearch/core/ReactiveDocumentOperations.java

+11
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
1819
import reactor.core.publisher.Flux;
1920
import reactor.core.publisher.Mono;
2021

@@ -36,6 +37,7 @@
3637
* @author Peter-Josef Meisch
3738
* @author Aleksei Arsenev
3839
* @author Roman Puchkovskiy
40+
* @author Farid Faoudi
3941
* @since 4.0
4042
*/
4143
public interface ReactiveDocumentOperations {
@@ -336,4 +338,13 @@ default Mono<String> delete(String id, Class<?> entityType, IndexCoordinates ind
336338
* @since 4.1
337339
*/
338340
Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index);
341+
342+
/**
343+
* Update document(s) by query.
344+
* @param updateQuery query defining the update
345+
* @param index the index where to update the records
346+
* @return a {@link Mono} emitting the update response
347+
* @since 4.2
348+
*/
349+
Mono<UpdateByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index);
339350
}

src/main/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchTemplate.java

+15
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.elasticsearch.core;
1717

18+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
19+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
1820
import reactor.core.publisher.Flux;
1921
import reactor.core.publisher.Mono;
2022
import reactor.util.function.Tuple2;
@@ -97,6 +99,7 @@
9799
* @author Roman Puchkovskiy
98100
* @author Russell Parry
99101
* @author Thomas Geese
102+
* @author Farid Faoudi
100103
* @since 3.2
101104
*/
102105
public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOperations, ApplicationContextAware {
@@ -558,6 +561,18 @@ public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates ind
558561
});
559562
}
560563

564+
@Override
565+
public Mono<UpdateByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
566+
567+
Assert.notNull(updateQuery, "UpdateQuery must not be null");
568+
Assert.notNull(index, "Index must not be null");
569+
570+
return Mono.defer(() -> {
571+
final UpdateByQueryRequest request = requestFactory.updateByQueryRequest(updateQuery, index);
572+
return Mono.from(execute(client -> client.updateBy(request)));
573+
});
574+
}
575+
561576
@Override
562577
public Mono<Long> delete(Query query, Class<?> entityType) {
563578
return delete(query, entityType, getIndexCoordinatesFor(entityType));

0 commit comments

Comments
 (0)