Skip to content

Enable use of search_after with field_collapse #2937

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,28 @@ private Flux<SearchDocument> doFindUnbounded(Query query, Class<?> clazz, IndexC
Function<PitSearchAfter, Publisher<? extends ResponseBody<EntityAsMap>>> resourceClosure = psa -> {

baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive));
baseQuery.addSort(Sort.by("_shard_doc"));

// only add _shard_doc if there is not a field_collapse and a sort with the same name
boolean addShardDoc = true;

if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) {
var field = nativeQuery.getFieldCollapse().field();

if (nativeQuery.getSortOptions().stream()
.anyMatch(sortOptions -> sortOptions.isField() && sortOptions.field().field().equals(field))) {
addShardDoc = false;
}

if (query.getSort() != null
&& query.getSort().stream().anyMatch(order -> order.getProperty().equals(field))) {
addShardDoc = false;
}
}

if (addShardDoc) {
baseQuery.addSort(Sort.by("_shard_doc"));
}

SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(),
clazz, index, false, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,8 +1487,8 @@ private <T> void prepareSearchRequest(Query query, @Nullable String routing, @Nu
if (query instanceof NativeQuery nativeQuery) {
prepareNativeSearch(nativeQuery, builder);
}
// query.getSort() must be checked after prepareNativeSearch as this already might hav a sort set that must have
// higher priority
// query.getSort() must be checked after prepareNativeSearch as this already might have a sort set
// that must have higher priority
if (query.getSort() != null) {
List<SortOptions> sortOptions = getSortOptions(query.getSort(), persistentEntity);

Expand All @@ -1510,7 +1510,15 @@ private <T> void prepareSearchRequest(Query query, @Nullable String routing, @Nu
}

if (!isEmpty(query.getSearchAfter())) {
builder.searchAfter(query.getSearchAfter().stream().map(TypeUtils::toFieldValue).toList());
var fieldValues = query.getSearchAfter().stream().map(TypeUtils::toFieldValue).toList();

// when there is a field collapse on a native query, and we have a search_after, then the search_after
// must only have one entry
if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) {
builder.searchAfter(fieldValues.get(0));
} else {
builder.searchAfter(fieldValues);
}
}

query.getRescorerQueries().forEach(rescorerQuery -> builder.rescore(getRescore(rescorerQuery)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,22 @@
*/
package org.springframework.data.elasticsearch.repository.support;

import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
import org.springframework.data.elasticsearch.client.elc.Queries;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration;
import org.springframework.data.elasticsearch.repositories.custommethod.QueryParameter;
import org.springframework.data.elasticsearch.repository.config.EnableReactiveElasticsearchRepositories;
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
import org.springframework.test.context.ContextConfiguration;
import reactor.test.StepVerifier;

/**
* @author Peter-Josef Meisch
Expand Down Expand Up @@ -51,4 +59,33 @@ QueryParameter queryParameter() {
}
}

/**
* search_after is used by the reactive search operation, it normally always adds _shard_doc as a tiebreaker sort
* parameter. This must not be done when a collapse field is used as sort field, as in that case the collapse field
* must be the only sort field.
*/
@Test // #2935
@DisplayName("should use collapse_field for search_after in pit search")
void shouldUseCollapseFieldForSearchAfterI() {
var entity = new SampleEntity();
entity.setId("42");
entity.setMessage("m");
entity.setKeyword("kw");
repository.save(entity).block();

var query = NativeQuery.builder()
.withQuery(Queries.matchAllQueryAsQuery())
.withPageable(Pageable.unpaged())
.withFieldCollapse(FieldCollapse.of(fcb -> fcb
.field("keyword")))
.withSort(Sort.by("keyword"))
.build();

operations.search(query, SampleEntity.class)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
}


}