Skip to content

Switch reactive unpaged search from scroll to pit with search_after. #2393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index)
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true);

SearchResponse<EntityAsMap> searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class));

Expand All @@ -319,7 +319,7 @@ public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates in
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false);
SearchResponse<EntityAsMap> searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class));

ReadDocumentCallback<T> readDocumentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;

import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.get.GetResult;
Expand All @@ -35,14 +34,19 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.domain.Sort;
import org.springframework.data.elasticsearch.BulkFailureException;
import org.springframework.data.elasticsearch.NoSuchIndexException;
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation;
import org.springframework.data.elasticsearch.client.erhlc.ReactiveClusterOperations;
import org.springframework.data.elasticsearch.client.util.ScrollState;
import org.springframework.data.elasticsearch.core.AbstractReactiveElasticsearchTemplate;
import org.springframework.data.elasticsearch.core.AggregationContainer;
import org.springframework.data.elasticsearch.core.IndexedObjectInformation;
Expand All @@ -54,6 +58,7 @@
import org.springframework.data.elasticsearch.core.document.SearchDocument;
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.BaseQuery;
import org.springframework.data.elasticsearch.core.query.BulkOptions;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.query.Query;
Expand All @@ -64,6 +69,7 @@
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;

/**
* Implementation of {@link org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations} using the new
Expand All @@ -74,6 +80,8 @@
*/
public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearchTemplate {

private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveElasticsearchTemplate.class);

private final ReactiveElasticsearchClient client;
private final RequestConverter requestConverter;
private final ResponseConverter responseConverter;
Expand Down Expand Up @@ -136,6 +144,32 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
});
}

@Override
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {

Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");

GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true);

return Mono.from(execute(
((ClientCallback<Publisher<GetResponse<EntityAsMap>>>) client -> client.get(getRequest, EntityAsMap.class))))
.map(GetResult::found) //
.onErrorReturn(NoSuchIndexException.class, false);
}

@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {

Assert.notNull(query, "query must not be null");

DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
getRefreshPolicy());
return Mono
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
.map(responseConverter::byQueryResponse);
}

@Override
public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {

Expand Down Expand Up @@ -183,6 +217,29 @@ public Mono<String> submitReindex(ReindexRequest reindexRequest) {
: Mono.just(response.task()));
}

@Override
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {

Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");

UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
routingResolver.getRouting());

return Mono.from(execute(
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.UpdateResponse<Document>>>) client -> client
.update(request, Document.class)))
.flatMap(response -> {
UpdateResponse.Result result = result(response.result());
return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result));
});
}

@Override
public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}

@Override
public Mono<Void> bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {

Expand Down Expand Up @@ -279,87 +336,108 @@ protected ReactiveElasticsearchTemplate doCopy() {
return new ReactiveElasticsearchTemplate(client, converter);
}

@Override
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {

Assert.notNull(id, "id must not be null");
Assert.notNull(index, "index must not be null");

GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true);

return Mono.from(execute(
((ClientCallback<Publisher<GetResponse<EntityAsMap>>>) client -> client.get(getRequest, EntityAsMap.class))))
.map(GetResult::found) //
.onErrorReturn(NoSuchIndexException.class, false);
}

@Override
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {

Assert.notNull(query, "query must not be null");

DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
getRefreshPolicy());
return Mono
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
.map(responseConverter::byQueryResponse);
}

// region search operations

@Override
protected Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {

return Flux.defer(() -> {
boolean useScroll = !(query.getPageable().isPaged() || query.isLimiting());
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, useScroll);
boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting());

if (useScroll) {
return doScroll(searchRequest);
} else {
return doFind(searchRequest);
}
return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index);
});

}

private Flux<SearchDocument> doScroll(SearchRequest searchRequest) {
private Flux<SearchDocument> doFindUnbounded(Query query, Class<?> clazz, IndexCoordinates index) {

if (query instanceof BaseQuery baseQuery) {
var pitKeepAlive = Duration.ofMinutes(5);
// setup functions for Flux.usingWhen()
Mono<PitSearchAfter> resourceSupplier = openPointInTime(index, pitKeepAlive, true)
.map(pit -> new PitSearchAfter(baseQuery, pit));

Function<PitSearchAfter, Publisher<?>> asyncComplete = this::cleanupPit;

BiFunction<PitSearchAfter, Throwable, Publisher<?>> asyncError = (psa, ex) -> {
if (LOGGER.isErrorEnabled()) {
LOGGER.error(String.format("Error during pit/search_after"), ex);
}
return cleanupPit(psa);
};

Function<PitSearchAfter, Publisher<?>> asyncCancel = psa -> {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(String.format("pit/search_after was cancelled"));
}
return cleanupPit(psa);
};

Time scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll() : Time.of(t -> t.time("1m"));
Function<PitSearchAfter, Publisher<? extends ResponseBody<EntityAsMap>>> resourceClosure = psa -> {

Flux<ResponseBody<EntityAsMap>> searchResponses = Flux.usingWhen(Mono.fromSupplier(ScrollState::new), //
state -> Mono
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
EntityAsMap.class))) //
.expand(entityAsMapSearchResponse -> {
baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive));
baseQuery.addSort(Sort.by("_shard_doc"));
SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true);

state.updateScrollId(entityAsMapSearchResponse.scrollId());
return Mono.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client
.search(firstSearchRequest, EntityAsMap.class))).expand(entityAsMapSearchResponse -> {

if (entityAsMapSearchResponse.hits() == null
|| CollectionUtils.isEmpty(entityAsMapSearchResponse.hits().hits())) {
var hits = entityAsMapSearchResponse.hits().hits();
if (CollectionUtils.isEmpty(hits)) {
return Mono.empty();
}

return Mono.from(execute((ClientCallback<Publisher<ScrollResponse<EntityAsMap>>>) client1 -> {
ScrollRequest scrollRequest = ScrollRequest
.of(sr -> sr.scrollId(state.getScrollId()).scroll(scrollTimeout));
return client1.scroll(scrollRequest, EntityAsMap.class);
}));
}),
this::cleanupScroll, (state, ex) -> cleanupScroll(state), this::cleanupScroll);
List<Object> sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject)
.collect(Collectors.toList());
baseQuery.setSearchAfter(sortOptions);
SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true);
return Mono.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client
.search(followSearchRequest, EntityAsMap.class)));
});

return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits())
.map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper));
};

Flux<ResponseBody<EntityAsMap>> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete,
asyncError, asyncCancel);
return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits())
.map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper));
} else {
return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery"));
}
}

private Publisher<?> cleanupScroll(ScrollState state) {
private Publisher<?> cleanupPit(PitSearchAfter psa) {
var baseQuery = psa.getBaseQuery();
baseQuery.setPointInTime(null);
baseQuery.setSearchAfter(null);
baseQuery.setSort(psa.getSort());
var pit = psa.getPit();
return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty();
}

static private class PitSearchAfter {
private final BaseQuery baseQuery;
@Nullable private final Sort sort;
private final String pit;

PitSearchAfter(BaseQuery baseQuery, String pit) {
this.baseQuery = baseQuery;
this.sort = baseQuery.getSort();
this.pit = pit;
}

public BaseQuery getBaseQuery() {
return baseQuery;
}

if (state.getScrollIds().isEmpty()) {
return Mono.empty();
@Nullable
public Sort getSort() {
return sort;
}

return execute((ClientCallback<Publisher<ClearScrollResponse>>) client -> client
.clearScroll(ClearScrollRequest.of(csr -> csr.scrollId(state.getScrollIds()))));
public String getPit() {
return pit;
}
}

@Override
Expand All @@ -368,15 +446,17 @@ protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true);

return Mono
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
EntityAsMap.class)))
.map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L);
}

private Flux<SearchDocument> doFind(SearchRequest searchRequest) {
private Flux<SearchDocument> doFindBounded(Query query, Class<?> clazz, IndexCoordinates index) {

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);

return Mono
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
Expand All @@ -391,7 +471,7 @@ protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<
Assert.notNull(query, "query must not be null");
Assert.notNull(index, "index must not be null");

SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false);

// noinspection unchecked
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>((Class<T>) clazz, index);
Expand Down Expand Up @@ -458,29 +538,6 @@ public Mono<String> getClusterVersion() {
})).map(infoResponse -> infoResponse.version().number());
}

@Override
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {

Assert.notNull(updateQuery, "UpdateQuery must not be null");
Assert.notNull(index, "Index must not be null");

UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
routingResolver.getRouting());

return Mono.from(execute(
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.UpdateResponse<Document>>>) client -> client
.update(request, Document.class)))
.flatMap(response -> {
UpdateResponse.Result result = result(response.result());
return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result));
});
}

@Override
public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
throw new UnsupportedOperationException("not implemented");
}

@Override
@Deprecated
public <T> Publisher<T> execute(ReactiveElasticsearchOperations.ClientCallback<Publisher<T>> callback) {
Expand Down
Loading