From c8f74806186cbe6b77f39b3eb14a8819b78f9628 Mon Sep 17 00:00:00 2001 From: Peter-Josef Meisch Date: Sat, 6 Jul 2024 09:06:43 +0200 Subject: [PATCH] Enable use of search_after with field_collapse Closes #2935 --- .../elc/ReactiveElasticsearchTemplate.java | 23 +++++++++++- .../client/elc/RequestConverter.java | 14 +++++-- ...icsearchRepositoryELCIntegrationTests.java | 37 +++++++++++++++++++ 3 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java index 12f1a59ee..5f4fd0360 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java @@ -395,7 +395,28 @@ private Flux doFindUnbounded(Query query, Class clazz, IndexC Function>> resourceClosure = psa -> { baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive)); - baseQuery.addSort(Sort.by("_shard_doc")); + + // only add _shard_doc if there is not a field_collapse and a sort with the same name + boolean addShardDoc = true; + + if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) { + var field = nativeQuery.getFieldCollapse().field(); + + if (nativeQuery.getSortOptions().stream() + .anyMatch(sortOptions -> sortOptions.isField() && sortOptions.field().field().equals(field))) { + addShardDoc = false; + } + + if (query.getSort() != null + && query.getSort().stream().anyMatch(order -> order.getProperty().equals(field))) { + addShardDoc = false; + } + } + + if (addShardDoc) { + baseQuery.addSort(Sort.by("_shard_doc")); + } + SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(), clazz, index, false, true); diff --git a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java index 614f958cf..216c68753 100644 --- a/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java +++ b/src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java @@ -1487,8 +1487,8 @@ private void prepareSearchRequest(Query query, @Nullable String routing, @Nu if (query instanceof NativeQuery nativeQuery) { prepareNativeSearch(nativeQuery, builder); } - // query.getSort() must be checked after prepareNativeSearch as this already might hav a sort set that must have - // higher priority + // query.getSort() must be checked after prepareNativeSearch as this already might have a sort set + // that must have higher priority if (query.getSort() != null) { List sortOptions = getSortOptions(query.getSort(), persistentEntity); @@ -1510,7 +1510,15 @@ private void prepareSearchRequest(Query query, @Nullable String routing, @Nu } if (!isEmpty(query.getSearchAfter())) { - builder.searchAfter(query.getSearchAfter().stream().map(TypeUtils::toFieldValue).toList()); + var fieldValues = query.getSearchAfter().stream().map(TypeUtils::toFieldValue).toList(); + + // when there is a field collapse on a native query, and we have a search_after, then the search_after + // must only have one entry + if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) { + builder.searchAfter(fieldValues.get(0)); + } else { + builder.searchAfter(fieldValues); + } } query.getRescorerQueries().forEach(rescorerQuery -> builder.rescore(getRescore(rescorerQuery))); diff --git a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryELCIntegrationTests.java b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryELCIntegrationTests.java index b5aec9c92..6f0de2d24 100644 --- a/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryELCIntegrationTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryELCIntegrationTests.java @@ -15,14 +15,22 @@ */ package org.springframework.data.elasticsearch.repository.support; +import co.elastic.clients.elasticsearch.core.search.FieldCollapse; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; +import org.springframework.data.domain.Pageable; +import org.springframework.data.domain.Sort; +import org.springframework.data.elasticsearch.client.elc.NativeQuery; +import org.springframework.data.elasticsearch.client.elc.Queries; import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration; import org.springframework.data.elasticsearch.repositories.custommethod.QueryParameter; import org.springframework.data.elasticsearch.repository.config.EnableReactiveElasticsearchRepositories; import org.springframework.data.elasticsearch.utils.IndexNameProvider; import org.springframework.test.context.ContextConfiguration; +import reactor.test.StepVerifier; /** * @author Peter-Josef Meisch @@ -51,4 +59,33 @@ QueryParameter queryParameter() { } } + /** + * search_after is used by the reactive search operation, it normally always adds _shard_doc as a tiebreaker sort + * parameter. This must not be done when a collapse field is used as sort field, as in that case the collapse field + * must be the only sort field. + */ + @Test // #2935 + @DisplayName("should use collapse_field for search_after in pit search") + void shouldUseCollapseFieldForSearchAfterI() { + var entity = new SampleEntity(); + entity.setId("42"); + entity.setMessage("m"); + entity.setKeyword("kw"); + repository.save(entity).block(); + + var query = NativeQuery.builder() + .withQuery(Queries.matchAllQueryAsQuery()) + .withPageable(Pageable.unpaged()) + .withFieldCollapse(FieldCollapse.of(fcb -> fcb + .field("keyword"))) + .withSort(Sort.by("keyword")) + .build(); + + operations.search(query, SampleEntity.class) + .as(StepVerifier::create) + .expectNextCount(1) + .verifyComplete(); + } + + }