Skip to content

Commit 9854c01

Browse files
committed
Polishing.
1 parent 0ee0164 commit 9854c01

19 files changed

+412
-340
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@
2222
import io.netty.handler.ssl.JdkSslContext;
2323
import io.netty.handler.timeout.ReadTimeoutHandler;
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
25-
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
26-
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
27-
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
28-
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2925
import reactor.core.publisher.Flux;
3026
import reactor.core.publisher.Mono;
3127
import reactor.netty.http.client.HttpClient;
@@ -88,6 +84,8 @@
8884
import org.elasticsearch.action.update.UpdateResponse;
8985
import org.elasticsearch.client.GetAliasesResponse;
9086
import org.elasticsearch.client.Request;
87+
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
88+
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
9189
import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
9290
import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
9391
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
@@ -100,6 +98,7 @@
10098
import org.elasticsearch.index.get.GetResult;
10199
import org.elasticsearch.index.reindex.BulkByScrollResponse;
102100
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
101+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
103102
import org.elasticsearch.rest.BytesRestResponse;
104103
import org.elasticsearch.rest.RestStatus;
105104
import org.elasticsearch.search.SearchHit;
@@ -116,6 +115,7 @@
116115
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
117116
import org.springframework.data.elasticsearch.client.util.NamedXContents;
118117
import org.springframework.data.elasticsearch.client.util.ScrollState;
118+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
119119
import org.springframework.data.util.Lazy;
120120
import org.springframework.http.HttpHeaders;
121121
import org.springframework.http.HttpMethod;
@@ -687,8 +687,10 @@ public Mono<GetMappingsResponse> getMapping(HttpHeaders headers, GetMappingsRequ
687687
}
688688

689689
@Override
690-
public Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers, GetFieldMappingsRequest getFieldMappingsRequest) {
691-
return sendRequest(getFieldMappingsRequest, requestCreator.getFieldMapping(), GetFieldMappingsResponse.class, headers).next();
690+
public Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers,
691+
GetFieldMappingsRequest getFieldMappingsRequest) {
692+
return sendRequest(getFieldMappingsRequest, requestCreator.getFieldMapping(), GetFieldMappingsResponse.class,
693+
headers).next();
692694
}
693695

694696
@Override

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+22-21
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
*/
1616
package org.springframework.data.elasticsearch.client.reactive;
1717

18-
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
19-
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
20-
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
21-
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2218
import reactor.core.publisher.Flux;
2319
import reactor.core.publisher.Mono;
2420

@@ -55,18 +51,22 @@
5551
import org.elasticsearch.action.update.UpdateRequest;
5652
import org.elasticsearch.action.update.UpdateResponse;
5753
import org.elasticsearch.client.GetAliasesResponse;
54+
import org.elasticsearch.client.indices.GetFieldMappingsRequest;
55+
import org.elasticsearch.client.indices.GetFieldMappingsResponse;
5856
import org.elasticsearch.client.indices.GetIndexTemplatesRequest;
5957
import org.elasticsearch.client.indices.GetIndexTemplatesResponse;
6058
import org.elasticsearch.client.indices.IndexTemplatesExistRequest;
6159
import org.elasticsearch.client.indices.PutIndexTemplateRequest;
6260
import org.elasticsearch.index.get.GetResult;
6361
import org.elasticsearch.index.reindex.BulkByScrollResponse;
6462
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
63+
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
6564
import org.elasticsearch.search.SearchHit;
6665
import org.elasticsearch.search.aggregations.Aggregation;
6766
import org.elasticsearch.search.suggest.Suggest;
6867
import org.springframework.data.elasticsearch.client.ClientConfiguration;
6968
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
69+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
7070
import org.springframework.http.HttpHeaders;
7171
import org.springframework.util.Assert;
7272
import org.springframework.util.CollectionUtils;
@@ -601,12 +601,12 @@ default Mono<BulkByScrollResponse> deleteBy(DeleteByQueryRequest deleteRequest)
601601
/**
602602
* Execute a {@link UpdateByQueryRequest} against the {@literal update by query} API.
603603
*
604-
* @param consumer never {@literal null}.
604+
* @param consumer must not be {@literal null}.
605605
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
606-
* * Query API on elastic.co</a>
606+
* * Query API on elastic.co</a>
607607
* @return a {@link Mono} emitting operation response.
608608
*/
609-
default Mono<UpdateByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> consumer){
609+
default Mono<UpdateByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> consumer) {
610610

611611
final UpdateByQueryRequest request = new UpdateByQueryRequest();
612612
consumer.accept(request);
@@ -618,10 +618,10 @@ default Mono<UpdateByQueryResponse> updateBy(Consumer<UpdateByQueryRequest> cons
618618
*
619619
* @param updateRequest must not be {@literal null}.
620620
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
621-
* * Query API on elastic.co</a>
621+
* * Query API on elastic.co</a>
622622
* @return a {@link Mono} emitting operation response.
623623
*/
624-
default Mono<UpdateByQueryResponse> updateBy(UpdateByQueryRequest updateRequest){
624+
default Mono<UpdateByQueryResponse> updateBy(UpdateByQueryRequest updateRequest) {
625625
return updateBy(HttpHeaders.EMPTY, updateRequest);
626626
}
627627

@@ -631,7 +631,7 @@ default Mono<UpdateByQueryResponse> updateBy(UpdateByQueryRequest updateRequest)
631631
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
632632
* @param updateRequest must not be {@literal null}.
633633
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">Update By
634-
* * Query API on elastic.co</a>
634+
* * Query API on elastic.co</a>
635635
* @return a {@link Mono} emitting operation response.
636636
*/
637637
Mono<UpdateByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest updateRequest);
@@ -1210,9 +1210,9 @@ default Mono<GetMappingsResponse> getMapping(GetMappingsRequest getMappingsReque
12101210
*
12111211
* @param consumer never {@literal null}.
12121212
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
1213-
* does not exist.
1214-
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html"> Indices
1215-
* Flush API on elastic.co</a>
1213+
* does not exist.
1214+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html">
1215+
* Indices Flush API on elastic.co</a>
12161216
* @since 4.2
12171217
*/
12181218
default Mono<GetFieldMappingsResponse> getFieldMapping(Consumer<GetFieldMappingsRequest> consumer) {
@@ -1228,8 +1228,8 @@ default Mono<GetFieldMappingsResponse> getFieldMapping(Consumer<GetFieldMappings
12281228
* @param getFieldMappingsRequest must not be {@literal null}.
12291229
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
12301230
* does not exist.
1231-
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html"> Indices
1232-
* Flush API on elastic.co</a>
1231+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html">
1232+
* Indices Flush API on elastic.co</a>
12331233
* @since 4.2
12341234
*/
12351235
default Mono<GetFieldMappingsResponse> getFieldMapping(GetFieldMappingsRequest getFieldMappingsRequest) {
@@ -1243,15 +1243,16 @@ default Mono<GetFieldMappingsResponse> getFieldMapping(GetFieldMappingsRequest g
12431243
* @param getFieldMappingsRequest must not be {@literal null}.
12441244
* @return a {@link Mono} signalling operation completion or an {@link Mono#error(Throwable) error} if eg. the index
12451245
* does not exist.
1246-
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html"> Indices
1247-
* Flush API on elastic.co</a>
1246+
* @see <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-get-field-mapping.html">
1247+
* Indices Flush API on elastic.co</a>
12481248
* @since 4.2
12491249
*/
1250-
Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers, GetFieldMappingsRequest getFieldMappingsRequest);
1250+
Mono<GetFieldMappingsResponse> getFieldMapping(HttpHeaders headers,
1251+
GetFieldMappingsRequest getFieldMappingsRequest);
12511252

12521253
/**
12531254
* Execute the given {@link IndicesAliasesRequest} against the {@literal indices} API.
1254-
*
1255+
*
12551256
* @param consumer never {@literal null}.
12561257
* @return a {@link Mono} signalling operation completion.
12571258
* @since 4.1
@@ -1264,7 +1265,7 @@ default Mono<Boolean> updateAliases(Consumer<IndicesAliasesRequest> consumer) {
12641265

12651266
/**
12661267
* Execute the given {@link IndicesAliasesRequest} against the {@literal indices} API.
1267-
*
1268+
*
12681269
* @param indicesAliasesRequest must not be {@literal null}
12691270
* @return a {@link Mono} signalling operation completion.
12701271
* @since 4.1
@@ -1275,7 +1276,7 @@ default Mono<Boolean> updateAliases(IndicesAliasesRequest indicesAliasesRequest)
12751276

12761277
/**
12771278
* Execute the given {@link IndicesAliasesRequest} against the {@literal indices} API.
1278-
*
1279+
*
12791280
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
12801281
* @param indicesAliasesRequest must not be {@literal null}
12811282
* @return a {@link Mono} signalling operation completion.

src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

+3
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ default Function<DeleteByQueryRequest, Request> deleteByQuery() {
9292
return RequestConverters::deleteByQuery;
9393
}
9494

95+
/**
96+
* @since 4.2
97+
*/
9598
default Function<UpdateByQueryRequest, Request> updateByQuery() {
9699
return RequestConverters::updateByQuery;
97100
}

src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

+11-9
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ public static Request search(SearchRequest searchRequest) {
413413

414414
/**
415415
* Creates a count request.
416-
*
416+
*
417417
* @param countRequest the search defining the data to be counted
418418
* @return Elasticsearch count request
419419
* @since 4.0
@@ -538,10 +538,9 @@ private static Request prepareReindexRequest(ReindexRequest reindexRequest, bool
538538
}
539539

540540
public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
541-
String endpoint = endpoint(updateByQueryRequest.indices(), updateByQueryRequest.getDocTypes(), "_update_by_query");
541+
String endpoint = endpoint(updateByQueryRequest.indices(), "_update_by_query");
542542
Request request = new Request(HttpMethod.POST.name(), endpoint);
543-
Params params = new Params(request)
544-
.withRouting(updateByQueryRequest.getRouting()) //
543+
Params params = new Params(request).withRouting(updateByQueryRequest.getRouting()) //
545544
.withPipeline(updateByQueryRequest.getPipeline()) //
546545
.withRefresh(updateByQueryRequest.isRefresh()) //
547546
.withTimeout(updateByQueryRequest.getTimeout()) //
@@ -552,15 +551,19 @@ public static Request updateByQuery(UpdateByQueryRequest updateByQueryRequest) {
552551
if (!updateByQueryRequest.isAbortOnVersionConflict()) {
553552
params.putParam("conflicts", "proceed");
554553
}
554+
555555
if (updateByQueryRequest.getBatchSize() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_SIZE) {
556556
params.putParam("scroll_size", Integer.toString(updateByQueryRequest.getBatchSize()));
557557
}
558+
558559
if (updateByQueryRequest.getScrollTime() != AbstractBulkByScrollRequest.DEFAULT_SCROLL_TIMEOUT) {
559560
params.putParam("scroll", updateByQueryRequest.getScrollTime());
560561
}
562+
561563
if (updateByQueryRequest.getMaxDocs() > 0) {
562564
params.putParam("max_docs", Integer.toString(updateByQueryRequest.getMaxDocs()));
563565
}
566+
564567
request.setEntity(createEntity(updateByQueryRequest, REQUEST_BODY_CONTENT_TYPE));
565568
return request;
566569
}
@@ -897,13 +900,12 @@ public static Request deleteTemplate(DeleteIndexTemplateRequest deleteIndexTempl
897900
}
898901

899902
public static Request getFieldMapping(GetFieldMappingsRequest getFieldMappingsRequest) {
900-
String[] indices = getFieldMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.indices();
903+
String[] indices = getFieldMappingsRequest.indices() == null ? Strings.EMPTY_ARRAY
904+
: getFieldMappingsRequest.indices();
901905
String[] fields = getFieldMappingsRequest.fields() == null ? Strings.EMPTY_ARRAY : getFieldMappingsRequest.fields();
902906

903-
final String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(indices)
904-
.addPathPartAsIs("_mapping").addPathPartAsIs("field")
905-
.addCommaSeparatedPathParts(fields)
906-
.build();
907+
final String endpoint = new EndpointBuilder().addCommaSeparatedPathParts(indices).addPathPartAsIs("_mapping")
908+
.addPathPartAsIs("field").addCommaSeparatedPathParts(fields).build();
907909

908910
Request request = new Request(HttpMethod.GET.name(), endpoint);
909911

src/main/java/org/springframework/data/elasticsearch/core/AbstractElasticsearchTemplate.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Iterator;
2323
import java.util.List;
2424
import java.util.Map;
25-
import java.util.Objects;
2625
import java.util.stream.Collectors;
2726
import java.util.stream.Stream;
2827

@@ -373,7 +372,7 @@ protected <R extends WriteRequest<R>> R prepareWriteRequest(R request) {
373372
return request;
374373
}
375374

376-
return request.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy());
375+
return request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy));
377376
}
378377

379378
/**
@@ -390,7 +389,7 @@ protected <R extends WriteRequestBuilder<R>> R prepareWriteRequestBuilder(R requ
390389
return requestBuilder;
391390
}
392391

393-
return requestBuilder.setRefreshPolicy(refreshPolicy.toRequestRefreshPolicy());
392+
return requestBuilder.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(refreshPolicy));
394393
}
395394

396395
// endregion

src/main/java/org/springframework/data/elasticsearch/core/DocumentOperations.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@
1818
import java.util.List;
1919

2020
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
21-
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2221
import org.springframework.data.elasticsearch.core.query.BulkOptions;
2322
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
2423
import org.springframework.data.elasticsearch.core.query.GetQuery;
2524
import org.springframework.data.elasticsearch.core.query.IndexQuery;
2625
import org.springframework.data.elasticsearch.core.query.Query;
26+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
2727
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
2828
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
2929
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
@@ -208,7 +208,7 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
208208

209209
/**
210210
* Bulk update all objects. Will do update.
211-
*
211+
*
212212
* @param clazz the entity class
213213
* @param queries the queries to execute in bulk
214214
* @since 4.1
@@ -304,8 +304,8 @@ default void bulkUpdate(List<UpdateQuery> queries, IndexCoordinates index) {
304304
/**
305305
* Update document(s) by query
306306
*
307-
* @param updateQuery query defining the update
308-
* @param index the index where to update the records
307+
* @param updateQuery query defining the update, must not be {@literal null}
308+
* @param index the index where to update the records , must not be {@literal null}
309309
* @return the update response
310310
* @since 4.2
311311
*/

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchRestTemplate.java

+29-5
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,11 @@
4848
import org.springframework.data.elasticsearch.core.document.DocumentAdapters;
4949
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
5050
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
51-
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
5251
import org.springframework.data.elasticsearch.core.query.BulkOptions;
5352
import org.springframework.data.elasticsearch.core.query.DeleteQuery;
5453
import org.springframework.data.elasticsearch.core.query.IndexQuery;
5554
import org.springframework.data.elasticsearch.core.query.Query;
55+
import org.springframework.data.elasticsearch.core.query.UpdateByQueryResponse;
5656
import org.springframework.data.elasticsearch.core.query.UpdateQuery;
5757
import org.springframework.data.elasticsearch.core.query.UpdateResponse;
5858
import org.springframework.data.elasticsearch.support.SearchHitsUtil;
@@ -162,7 +162,7 @@ public String doIndex(IndexQuery query, IndexCoordinates index) {
162162
@Override
163163
@Nullable
164164
public <T> T get(String id, Class<T> clazz, IndexCoordinates index) {
165-
GetRequest request = requestFactory.getRequest(id,routingResolver.getRouting(), index);
165+
GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(), index);
166166
GetResponse response = execute(client -> client.get(request, RequestOptions.DEFAULT));
167167

168168
DocumentCallback<T> callback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
@@ -184,7 +184,7 @@ public <T> List<T> multiGet(Query query, Class<T> clazz, IndexCoordinates index)
184184

185185
@Override
186186
protected boolean doExists(String id, IndexCoordinates index) {
187-
GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(),index);
187+
GetRequest request = requestFactory.getRequest(id, routingResolver.getRouting(), index);
188188
request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);
189189
return execute(client -> client.get(request, RequestOptions.DEFAULT).isExists());
190190
}
@@ -225,15 +225,39 @@ public void delete(DeleteQuery deleteQuery, IndexCoordinates index) {
225225
@Override
226226
public UpdateResponse update(UpdateQuery query, IndexCoordinates index) {
227227
UpdateRequest request = requestFactory.updateRequest(query, index);
228-
UpdateResponse.Result result = UpdateResponse.Result
228+
229+
if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) {
230+
request.setRefreshPolicy(RequestFactory.toElasticsearchRefreshPolicy(getRefreshPolicy()));
231+
}
232+
233+
if (query.getRouting() == null && routingResolver.getRouting() != null) {
234+
request.routing(routingResolver.getRouting());
235+
}
236+
237+
UpdateResponse.Result result = UpdateResponse.Result
229238
.valueOf(execute(client -> client.update(request, RequestOptions.DEFAULT)).getResult().name());
230239
return new UpdateResponse(result);
231240
}
232241

233242
@Override
234243
public UpdateByQueryResponse updateByQuery(UpdateQuery query, IndexCoordinates index) {
244+
245+
Assert.notNull(query, "query must not be null");
246+
Assert.notNull(index, "index must not be null");
247+
235248
final UpdateByQueryRequest updateByQueryRequest = requestFactory.updateByQueryRequest(query, index);
236-
final BulkByScrollResponse bulkByScrollResponse = execute(client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT));
249+
250+
if (query.getRefreshPolicy() == null && getRefreshPolicy() != null) {
251+
updateByQueryRequest.setRefresh(getRefreshPolicy() == RefreshPolicy.IMMEDIATE);
252+
}
253+
254+
255+
if (query.getRouting() == null && routingResolver.getRouting() != null) {
256+
updateByQueryRequest.setRouting(routingResolver.getRouting());
257+
}
258+
259+
final BulkByScrollResponse bulkByScrollResponse = execute(
260+
client -> client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT));
237261
return UpdateByQueryResponse.of(bulkByScrollResponse);
238262
}
239263

0 commit comments

Comments
 (0)