diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java index 1afaea91f..bf3a83e8f 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java @@ -306,7 +306,7 @@ public long count(Query query, @Nullable Class clazz, IndexCoordinates index) Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); - SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true, false); + SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true); SearchResponse searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class)); @@ -319,7 +319,7 @@ public SearchHits search(Query query, Class clazz, IndexCoordinates in Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); - SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false); + SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false); SearchResponse searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class)); ReadDocumentCallback readDocumentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index); diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index fe2e782b8..249cf84ba 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -19,7 +19,6 @@ import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*; import co.elastic.clients.elasticsearch._types.Result; -import co.elastic.clients.elasticsearch._types.Time; import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.core.get.GetResult; @@ -35,14 +34,19 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.domain.Sort; import org.springframework.data.elasticsearch.BulkFailureException; import org.springframework.data.elasticsearch.NoSuchIndexException; import org.springframework.data.elasticsearch.UncategorizedElasticsearchException; import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation; import org.springframework.data.elasticsearch.client.erhlc.ReactiveClusterOperations; -import org.springframework.data.elasticsearch.client.util.ScrollState; import org.springframework.data.elasticsearch.core.AbstractReactiveElasticsearchTemplate; import org.springframework.data.elasticsearch.core.AggregationContainer; import org.springframework.data.elasticsearch.core.IndexedObjectInformation; @@ -54,6 +58,7 @@ import org.springframework.data.elasticsearch.core.document.SearchDocument; import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; +import org.springframework.data.elasticsearch.core.query.BaseQuery; import org.springframework.data.elasticsearch.core.query.BulkOptions; import org.springframework.data.elasticsearch.core.query.ByQueryResponse; import org.springframework.data.elasticsearch.core.query.Query; @@ -64,6 +69,7 @@ import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; +import org.springframework.util.StringUtils; /** * Implementation of {@link org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations} using the new @@ -74,6 +80,8 @@ */ public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearchTemplate { + private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveElasticsearchTemplate.class); + private final ReactiveElasticsearchClient client; private final RequestConverter requestConverter; private final ResponseConverter responseConverter; @@ -136,6 +144,32 @@ public Flux saveAll(Mono> entitiesPubli }); } + @Override + protected Mono doExists(String id, IndexCoordinates index) { + + Assert.notNull(id, "id must not be null"); + Assert.notNull(index, "index must not be null"); + + GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true); + + return Mono.from(execute( + ((ClientCallback>>) client -> client.get(getRequest, EntityAsMap.class)))) + .map(GetResult::found) // + .onErrorReturn(NoSuchIndexException.class, false); + } + + @Override + public Mono delete(Query query, Class entityType, IndexCoordinates index) { + + Assert.notNull(query, "query must not be null"); + + DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index, + getRefreshPolicy()); + return Mono + .from(execute((ClientCallback>) client -> client.deleteByQuery(request))) + .map(responseConverter::byQueryResponse); + } + @Override public Mono get(String id, Class entityType, IndexCoordinates index) { @@ -183,6 +217,29 @@ public Mono submitReindex(ReindexRequest reindexRequest) { : Mono.just(response.task())); } + @Override + public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { + + Assert.notNull(updateQuery, "UpdateQuery must not be null"); + Assert.notNull(index, "Index must not be null"); + + UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), + routingResolver.getRouting()); + + return Mono.from(execute( + (ClientCallback>>) client -> client + .update(request, Document.class))) + .flatMap(response -> { + UpdateResponse.Result result = result(response.result()); + return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); + }); + } + + @Override + public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { + throw new UnsupportedOperationException("not implemented"); + } + @Override public Mono bulkUpdate(List queries, BulkOptions bulkOptions, IndexCoordinates index) { @@ -279,87 +336,108 @@ protected ReactiveElasticsearchTemplate doCopy() { return new ReactiveElasticsearchTemplate(client, converter); } - @Override - protected Mono doExists(String id, IndexCoordinates index) { - - Assert.notNull(id, "id must not be null"); - Assert.notNull(index, "index must not be null"); - - GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true); - - return Mono.from(execute( - ((ClientCallback>>) client -> client.get(getRequest, EntityAsMap.class)))) - .map(GetResult::found) // - .onErrorReturn(NoSuchIndexException.class, false); - } - - @Override - public Mono delete(Query query, Class entityType, IndexCoordinates index) { - - Assert.notNull(query, "query must not be null"); - - DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index, - getRefreshPolicy()); - return Mono - .from(execute((ClientCallback>) client -> client.deleteByQuery(request))) - .map(responseConverter::byQueryResponse); - } - // region search operations @Override protected Flux doFind(Query query, Class clazz, IndexCoordinates index) { return Flux.defer(() -> { - boolean useScroll = !(query.getPageable().isPaged() || query.isLimiting()); - SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, useScroll); + boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting()); - if (useScroll) { - return doScroll(searchRequest); - } else { - return doFind(searchRequest); - } + return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index); }); } - private Flux doScroll(SearchRequest searchRequest) { + private Flux doFindUnbounded(Query query, Class clazz, IndexCoordinates index) { + + if (query instanceof BaseQuery baseQuery) { + var pitKeepAlive = Duration.ofMinutes(5); + // setup functions for Flux.usingWhen() + Mono resourceSupplier = openPointInTime(index, pitKeepAlive, true) + .map(pit -> new PitSearchAfter(baseQuery, pit)); + + Function> asyncComplete = this::cleanupPit; + + BiFunction> asyncError = (psa, ex) -> { + if (LOGGER.isErrorEnabled()) { + LOGGER.error(String.format("Error during pit/search_after"), ex); + } + return cleanupPit(psa); + }; + + Function> asyncCancel = psa -> { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn(String.format("pit/search_after was cancelled")); + } + return cleanupPit(psa); + }; - Time scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll() : Time.of(t -> t.time("1m")); + Function>> resourceClosure = psa -> { - Flux> searchResponses = Flux.usingWhen(Mono.fromSupplier(ScrollState::new), // - state -> Mono - .from(execute((ClientCallback>>) client -> client.search(searchRequest, - EntityAsMap.class))) // - .expand(entityAsMapSearchResponse -> { + baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); + baseQuery.addSort(Sort.by("_shard_doc")); + SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true); - state.updateScrollId(entityAsMapSearchResponse.scrollId()); + return Mono.from(execute((ClientCallback>>) client -> client + .search(firstSearchRequest, EntityAsMap.class))).expand(entityAsMapSearchResponse -> { - if (entityAsMapSearchResponse.hits() == null - || CollectionUtils.isEmpty(entityAsMapSearchResponse.hits().hits())) { + var hits = entityAsMapSearchResponse.hits().hits(); + if (CollectionUtils.isEmpty(hits)) { return Mono.empty(); } - return Mono.from(execute((ClientCallback>>) client1 -> { - ScrollRequest scrollRequest = ScrollRequest - .of(sr -> sr.scrollId(state.getScrollId()).scroll(scrollTimeout)); - return client1.scroll(scrollRequest, EntityAsMap.class); - })); - }), - this::cleanupScroll, (state, ex) -> cleanupScroll(state), this::cleanupScroll); + List sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject) + .collect(Collectors.toList()); + baseQuery.setSearchAfter(sortOptions); + SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true); + return Mono.from(execute((ClientCallback>>) client -> client + .search(followSearchRequest, EntityAsMap.class))); + }); - return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) - .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + }; + + Flux> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete, + asyncError, asyncCancel); + return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits()) + .map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper)); + } else { + return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery")); + } } - private Publisher cleanupScroll(ScrollState state) { + private Publisher cleanupPit(PitSearchAfter psa) { + var baseQuery = psa.getBaseQuery(); + baseQuery.setPointInTime(null); + baseQuery.setSearchAfter(null); + baseQuery.setSort(psa.getSort()); + var pit = psa.getPit(); + return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty(); + } + + static private class PitSearchAfter { + private final BaseQuery baseQuery; + @Nullable private final Sort sort; + private final String pit; + + PitSearchAfter(BaseQuery baseQuery, String pit) { + this.baseQuery = baseQuery; + this.sort = baseQuery.getSort(); + this.pit = pit; + } + + public BaseQuery getBaseQuery() { + return baseQuery; + } - if (state.getScrollIds().isEmpty()) { - return Mono.empty(); + @Nullable + public Sort getSort() { + return sort; } - return execute((ClientCallback>) client -> client - .clearScroll(ClearScrollRequest.of(csr -> csr.scrollId(state.getScrollIds())))); + public String getPit() { + return pit; + } } @Override @@ -368,7 +446,7 @@ protected Mono doCount(Query query, Class entityType, IndexCoordinates Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); - SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true, false); + SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true); return Mono .from(execute((ClientCallback>>) client -> client.search(searchRequest, @@ -376,7 +454,9 @@ protected Mono doCount(Query query, Class entityType, IndexCoordinates .map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L); } - private Flux doFind(SearchRequest searchRequest) { + private Flux doFindBounded(Query query, Class clazz, IndexCoordinates index) { + + SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false); return Mono .from(execute((ClientCallback>>) client -> client.search(searchRequest, @@ -391,7 +471,7 @@ protected Mono doFindForResponse(Query query, Class< Assert.notNull(query, "query must not be null"); Assert.notNull(index, "index must not be null"); - SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false); + SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false); // noinspection unchecked SearchDocumentCallback callback = new ReadSearchDocumentCallback<>((Class) clazz, index); @@ -458,29 +538,6 @@ public Mono getClusterVersion() { })).map(infoResponse -> infoResponse.version().number()); } - @Override - public Mono update(UpdateQuery updateQuery, IndexCoordinates index) { - - Assert.notNull(updateQuery, "UpdateQuery must not be null"); - Assert.notNull(index, "Index must not be null"); - - UpdateRequest request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(), - routingResolver.getRouting()); - - return Mono.from(execute( - (ClientCallback>>) client -> client - .update(request, Document.class))) - .flatMap(response -> { - UpdateResponse.Result result = result(response.result()); - return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result)); - }); - } - - @Override - public Mono updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) { - throw new UnsupportedOperationException("not implemented"); - } - @Override @Deprecated public Publisher execute(ReactiveElasticsearchOperations.ClientCallback> callback) { diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java index 5c7678e4b..355bce2e7 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java @@ -15,12 +15,8 @@ */ package org.springframework.data.elasticsearch.client.elc; -import static org.springframework.data.elasticsearch.client.elc.TypeUtils.searchType; -import static org.springframework.data.elasticsearch.client.elc.TypeUtils.slices; -import static org.springframework.data.elasticsearch.client.elc.TypeUtils.time; -import static org.springframework.data.elasticsearch.client.elc.TypeUtils.timeStringMs; -import static org.springframework.data.elasticsearch.client.elc.TypeUtils.toFloat; -import static org.springframework.util.CollectionUtils.isEmpty; +import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*; +import static org.springframework.util.CollectionUtils.*; import co.elastic.clients.elasticsearch._types.Conflicts; import co.elastic.clients.elasticsearch._types.FieldValue; @@ -37,18 +33,7 @@ import co.elastic.clients.elasticsearch._types.mapping.TypeMapping; import co.elastic.clients.elasticsearch._types.query_dsl.Like; import co.elastic.clients.elasticsearch.cluster.HealthRequest; -import co.elastic.clients.elasticsearch.core.BulkRequest; -import co.elastic.clients.elasticsearch.core.ClosePointInTimeRequest; -import co.elastic.clients.elasticsearch.core.DeleteByQueryRequest; -import co.elastic.clients.elasticsearch.core.DeleteRequest; -import co.elastic.clients.elasticsearch.core.GetRequest; -import co.elastic.clients.elasticsearch.core.IndexRequest; -import co.elastic.clients.elasticsearch.core.MgetRequest; -import co.elastic.clients.elasticsearch.core.MsearchRequest; -import co.elastic.clients.elasticsearch.core.OpenPointInTimeRequest; -import co.elastic.clients.elasticsearch.core.SearchRequest; -import co.elastic.clients.elasticsearch.core.UpdateByQueryRequest; -import co.elastic.clients.elasticsearch.core.UpdateRequest; +import co.elastic.clients.elasticsearch.core.*; import co.elastic.clients.elasticsearch.core.bulk.BulkOperation; import co.elastic.clients.elasticsearch.core.bulk.CreateOperation; import co.elastic.clients.elasticsearch.core.bulk.IndexOperation; @@ -58,17 +43,8 @@ import co.elastic.clients.elasticsearch.core.search.Highlight; import co.elastic.clients.elasticsearch.core.search.Rescore; import co.elastic.clients.elasticsearch.core.search.SourceConfig; -import co.elastic.clients.elasticsearch.indices.CreateIndexRequest; -import co.elastic.clients.elasticsearch.indices.DeleteIndexRequest; +import co.elastic.clients.elasticsearch.indices.*; import co.elastic.clients.elasticsearch.indices.ExistsRequest; -import co.elastic.clients.elasticsearch.indices.GetAliasRequest; -import co.elastic.clients.elasticsearch.indices.GetIndexRequest; -import co.elastic.clients.elasticsearch.indices.GetIndicesSettingsRequest; -import co.elastic.clients.elasticsearch.indices.GetMappingRequest; -import co.elastic.clients.elasticsearch.indices.IndexSettings; -import co.elastic.clients.elasticsearch.indices.PutMappingRequest; -import co.elastic.clients.elasticsearch.indices.RefreshRequest; -import co.elastic.clients.elasticsearch.indices.UpdateAliasesRequest; import co.elastic.clients.elasticsearch.indices.update_aliases.Action; import co.elastic.clients.json.JsonData; import co.elastic.clients.json.JsonpDeserializer; @@ -106,19 +82,7 @@ import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentProperty; import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates; -import org.springframework.data.elasticsearch.core.query.BaseQuery; -import org.springframework.data.elasticsearch.core.query.BulkOptions; -import org.springframework.data.elasticsearch.core.query.CriteriaQuery; -import org.springframework.data.elasticsearch.core.query.GeoDistanceOrder; -import org.springframework.data.elasticsearch.core.query.IndexQuery; -import org.springframework.data.elasticsearch.core.query.MoreLikeThisQuery; -import org.springframework.data.elasticsearch.core.query.Order; -import org.springframework.data.elasticsearch.core.query.Query; -import org.springframework.data.elasticsearch.core.query.RescorerQuery; -import org.springframework.data.elasticsearch.core.query.ScriptData; -import org.springframework.data.elasticsearch.core.query.SourceFilter; -import org.springframework.data.elasticsearch.core.query.StringQuery; -import org.springframework.data.elasticsearch.core.query.UpdateQuery; +import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.elasticsearch.core.reindex.ReindexRequest; import org.springframework.data.elasticsearch.core.reindex.Remote; import org.springframework.data.elasticsearch.support.DefaultStringObjectMap; @@ -1030,18 +994,22 @@ public UpdateByQueryRequest documentUpdateByQueryRequest(UpdateQuery updateQuery // region search public SearchRequest searchRequest(Query query, @Nullable Class clazz, IndexCoordinates indexCoordinates, - boolean forCount, long scrollTimeInMillis) { + boolean forCount) { + return searchRequest(query, clazz, indexCoordinates, forCount, false, null); + } + public SearchRequest searchRequest(Query query, @Nullable Class clazz, IndexCoordinates indexCoordinates, + boolean forCount, long scrollTimeInMillis) { return searchRequest(query, clazz, indexCoordinates, forCount, true, scrollTimeInMillis); } public SearchRequest searchRequest(Query query, @Nullable Class clazz, IndexCoordinates indexCoordinates, - boolean forCount, boolean useScroll) { - return searchRequest(query, clazz, indexCoordinates, forCount, useScroll, null); + boolean forCount, boolean forBatchedSearch) { + return searchRequest(query, clazz, indexCoordinates, forCount, forBatchedSearch, null); } public SearchRequest searchRequest(Query query, @Nullable Class clazz, IndexCoordinates indexCoordinates, - boolean forCount, boolean useScroll, @Nullable Long scrollTimeInMillis) { + boolean forCount, boolean forBatchedSearch, @Nullable Long scrollTimeInMillis) { String[] indexNames = indexCoordinates.getIndexNames(); Assert.notNull(query, "query must not be null"); @@ -1049,7 +1017,7 @@ public SearchRequest searchRequest(Query query, @Nullable Class clazz, In elasticsearchConverter.updateQuery(query, clazz); SearchRequest.Builder builder = new SearchRequest.Builder(); - prepareSearchRequest(query, clazz, indexCoordinates, builder, forCount, useScroll); + prepareSearchRequest(query, clazz, indexCoordinates, builder, forCount, forBatchedSearch); if (scrollTimeInMillis != null) { builder.scroll(t -> t.time(scrollTimeInMillis + "ms")); @@ -1184,7 +1152,7 @@ public MsearchRequest searchMsearchRequest( } private void prepareSearchRequest(Query query, @Nullable Class clazz, IndexCoordinates indexCoordinates, - SearchRequest.Builder builder, boolean forCount, boolean useScroll) { + SearchRequest.Builder builder, boolean forCount, boolean forBatchedSearch) { String[] indexNames = indexCoordinates.getIndexNames(); @@ -1307,11 +1275,9 @@ private void prepareSearchRequest(Query query, @Nullable Class clazz, Ind builder.size(0) // .trackTotalHits(th -> th.count(Integer.MAX_VALUE)) // .source(SourceConfig.of(sc -> sc.fetch(false))); - } else if (useScroll) { + } else if (forBatchedSearch) { // request_cache is not allowed on scroll requests. builder.requestCache(null); - Duration scrollTimeout = query.getScrollTime() != null ? query.getScrollTime() : Duration.ofMinutes(1); - builder.scroll(time(scrollTimeout)); // limit the number of documents in a batch builder.size(query.getReactiveBatchSize()); } diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java index 346a125ae..71c45ef36 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/TypeUtils.java @@ -125,6 +125,36 @@ static String toString(@Nullable FieldValue fieldValue) { default -> throw new IllegalStateException("Unexpected value: " + fieldValue._kind()); } } + @Nullable + static Object toObject(@Nullable FieldValue fieldValue) { + + if (fieldValue == null) { + return null; + } + + switch (fieldValue._kind()) { + case Double -> { + return Double.valueOf(fieldValue.doubleValue()); + } + case Long -> { + return Long.valueOf(fieldValue.longValue()); + } + case Boolean -> { + return Boolean.valueOf(fieldValue.booleanValue()); + } + case String -> { + return fieldValue.stringValue(); + } + case Null -> { + return null; + } + case Any -> { + return fieldValue.anyValue().toString(); + } + + default -> throw new IllegalStateException("Unexpected value: " + fieldValue._kind()); + } + } @Nullable static GeoDistanceType geoDistanceType(GeoDistanceOrder.DistanceType distanceType) { diff --git a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java index e924438f4..891bade17 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/query/BaseQuery.java @@ -111,6 +111,13 @@ public > BaseQuery(BaseQue this.reactiveBatchSize = builder.getReactiveBatchSize(); } + /** + * @since 5.1 + */ + public void setSort(@Nullable Sort sort) { + this.sort = sort; + } + @Override @Nullable public Sort getSort() { diff --git a/src/test/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchPartQueryELCIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchPartQueryELCIntegrationTests.java index 46507630a..7080e3b12 100644 --- a/src/test/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchPartQueryELCIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchPartQueryELCIntegrationTests.java @@ -40,7 +40,7 @@ protected String buildQueryString(Query query, Class clazz) { JacksonJsonpMapper jsonpMapper = new JacksonJsonpMapper(); RequestConverter requestConverter = new RequestConverter(operations.getElasticsearchConverter(), jsonpMapper); - SearchRequest request = requestConverter.searchRequest(query, clazz, IndexCoordinates.of("dummy"), false, false); + SearchRequest request = requestConverter.searchRequest(query, clazz, IndexCoordinates.of("dummy"), false); return JsonUtils.toJson(request, jsonpMapper); // return "{\"query\":" + JsonUtils.toJson(request.query(), jsonpMapper) + "}"; diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java index bb28ae81d..a4529f356 100644 --- a/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java @@ -463,11 +463,14 @@ public void findWithoutPagingShouldReadAll() { index(IntStream.range(0, 100).mapToObj(it -> randomEntity("entity - " + it)).toArray(SampleEntity[]::new)); - CriteriaQuery query = new CriteriaQuery(new Criteria("message").contains("entity")) // - .addSort(Sort.by("message"))// - .setPageable(Pageable.unpaged()); + var query = CriteriaQuery.builder(new Criteria("message").contains("entity")) // + .withSort(Sort.by("message")) // + .withPageable(Pageable.unpaged()) // + .withReactiveBatchSize(20) // + .build(); - operations.search(query, SampleEntity.class).as(StepVerifier::create) // + operations.search(query, SampleEntity.class) // + .as(StepVerifier::create) // .expectNextCount(100) // .verifyComplete(); } diff --git a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ClusterConnection.java b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ClusterConnection.java index 2f702c935..bcbf2e181 100644 --- a/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ClusterConnection.java +++ b/src/test/java/org/springframework/data/elasticsearch/junit/jupiter/ClusterConnection.java @@ -18,6 +18,7 @@ import static org.springframework.util.StringUtils.*; import java.io.InputStream; +import java.time.Duration; import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -131,7 +132,7 @@ private ClusterConnectionInfo startElasticsearchContainer() { DockerImageName dockerImageName = getDockerImageName(testcontainersProperties); ElasticsearchContainer elasticsearchContainer = new SpringDataElasticsearchContainer(dockerImageName) - .withEnv(testcontainersProperties); + .withEnv(testcontainersProperties).withStartupTimeout(Duration.ofMinutes(2)); elasticsearchContainer.start(); return ClusterConnectionInfo.builder() //