Skip to content

Commit e1c8a2a

Browse files
authored
Switch reactive unpaged search from scroll to pit with search_after.
Original Pull Request #2393 Closes #1685
1 parent 014aa3d commit e1c8a2a

File tree

8 files changed

+207
-143
lines changed

8 files changed

+207
-143
lines changed

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/ElasticsearchTemplate.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ public long count(Query query, @Nullable Class<?> clazz, IndexCoordinates index)
306306
Assert.notNull(query, "query must not be null");
307307
Assert.notNull(index, "index must not be null");
308308

309-
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true, false);
309+
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, true);
310310

311311
SearchResponse<EntityAsMap> searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class));
312312

@@ -319,7 +319,7 @@ public <T> SearchHits<T> search(Query query, Class<T> clazz, IndexCoordinates in
319319
Assert.notNull(query, "query must not be null");
320320
Assert.notNull(index, "index must not be null");
321321

322-
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
322+
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false);
323323
SearchResponse<EntityAsMap> searchResponse = execute(client -> client.search(searchRequest, EntityAsMap.class));
324324

325325
ReadDocumentCallback<T> readDocumentCallback = new ReadDocumentCallback<>(elasticsearchConverter, clazz, index);

Diff for: src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java

+142-85
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static org.springframework.data.elasticsearch.client.elc.TypeUtils.*;
2020

2121
import co.elastic.clients.elasticsearch._types.Result;
22-
import co.elastic.clients.elasticsearch._types.Time;
2322
import co.elastic.clients.elasticsearch.core.*;
2423
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
2524
import co.elastic.clients.elasticsearch.core.get.GetResult;
@@ -35,14 +34,19 @@
3534
import java.util.HashMap;
3635
import java.util.List;
3736
import java.util.Map;
37+
import java.util.function.BiFunction;
38+
import java.util.function.Function;
39+
import java.util.stream.Collectors;
3840

3941
import org.reactivestreams.Publisher;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
import org.springframework.data.domain.Sort;
4045
import org.springframework.data.elasticsearch.BulkFailureException;
4146
import org.springframework.data.elasticsearch.NoSuchIndexException;
4247
import org.springframework.data.elasticsearch.UncategorizedElasticsearchException;
4348
import org.springframework.data.elasticsearch.client.UnsupportedBackendOperation;
4449
import org.springframework.data.elasticsearch.client.erhlc.ReactiveClusterOperations;
45-
import org.springframework.data.elasticsearch.client.util.ScrollState;
4650
import org.springframework.data.elasticsearch.core.AbstractReactiveElasticsearchTemplate;
4751
import org.springframework.data.elasticsearch.core.AggregationContainer;
4852
import org.springframework.data.elasticsearch.core.IndexedObjectInformation;
@@ -54,6 +58,7 @@
5458
import org.springframework.data.elasticsearch.core.document.SearchDocument;
5559
import org.springframework.data.elasticsearch.core.document.SearchDocumentResponse;
5660
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
61+
import org.springframework.data.elasticsearch.core.query.BaseQuery;
5762
import org.springframework.data.elasticsearch.core.query.BulkOptions;
5863
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
5964
import org.springframework.data.elasticsearch.core.query.Query;
@@ -64,6 +69,7 @@
6469
import org.springframework.lang.Nullable;
6570
import org.springframework.util.Assert;
6671
import org.springframework.util.CollectionUtils;
72+
import org.springframework.util.StringUtils;
6773

6874
/**
6975
* Implementation of {@link org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations} using the new
@@ -74,6 +80,8 @@
7480
*/
7581
public class ReactiveElasticsearchTemplate extends AbstractReactiveElasticsearchTemplate {
7682

83+
private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveElasticsearchTemplate.class);
84+
7785
private final ReactiveElasticsearchClient client;
7886
private final RequestConverter requestConverter;
7987
private final ResponseConverter responseConverter;
@@ -136,6 +144,32 @@ public <T> Flux<T> saveAll(Mono<? extends Collection<? extends T>> entitiesPubli
136144
});
137145
}
138146

147+
@Override
148+
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {
149+
150+
Assert.notNull(id, "id must not be null");
151+
Assert.notNull(index, "index must not be null");
152+
153+
GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true);
154+
155+
return Mono.from(execute(
156+
((ClientCallback<Publisher<GetResponse<EntityAsMap>>>) client -> client.get(getRequest, EntityAsMap.class))))
157+
.map(GetResult::found) //
158+
.onErrorReturn(NoSuchIndexException.class, false);
159+
}
160+
161+
@Override
162+
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
163+
164+
Assert.notNull(query, "query must not be null");
165+
166+
DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
167+
getRefreshPolicy());
168+
return Mono
169+
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
170+
.map(responseConverter::byQueryResponse);
171+
}
172+
139173
@Override
140174
public <T> Mono<T> get(String id, Class<T> entityType, IndexCoordinates index) {
141175

@@ -183,6 +217,29 @@ public Mono<String> submitReindex(ReindexRequest reindexRequest) {
183217
: Mono.just(response.task()));
184218
}
185219

220+
@Override
221+
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {
222+
223+
Assert.notNull(updateQuery, "UpdateQuery must not be null");
224+
Assert.notNull(index, "Index must not be null");
225+
226+
UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
227+
routingResolver.getRouting());
228+
229+
return Mono.from(execute(
230+
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.UpdateResponse<Document>>>) client -> client
231+
.update(request, Document.class)))
232+
.flatMap(response -> {
233+
UpdateResponse.Result result = result(response.result());
234+
return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result));
235+
});
236+
}
237+
238+
@Override
239+
public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
240+
throw new UnsupportedOperationException("not implemented");
241+
}
242+
186243
@Override
187244
public Mono<Void> bulkUpdate(List<UpdateQuery> queries, BulkOptions bulkOptions, IndexCoordinates index) {
188245

@@ -279,87 +336,108 @@ protected ReactiveElasticsearchTemplate doCopy() {
279336
return new ReactiveElasticsearchTemplate(client, converter);
280337
}
281338

282-
@Override
283-
protected Mono<Boolean> doExists(String id, IndexCoordinates index) {
284-
285-
Assert.notNull(id, "id must not be null");
286-
Assert.notNull(index, "index must not be null");
287-
288-
GetRequest getRequest = requestConverter.documentGetRequest(id, routingResolver.getRouting(), index, true);
289-
290-
return Mono.from(execute(
291-
((ClientCallback<Publisher<GetResponse<EntityAsMap>>>) client -> client.get(getRequest, EntityAsMap.class))))
292-
.map(GetResult::found) //
293-
.onErrorReturn(NoSuchIndexException.class, false);
294-
}
295-
296-
@Override
297-
public Mono<ByQueryResponse> delete(Query query, Class<?> entityType, IndexCoordinates index) {
298-
299-
Assert.notNull(query, "query must not be null");
300-
301-
DeleteByQueryRequest request = requestConverter.documentDeleteByQueryRequest(query, entityType, index,
302-
getRefreshPolicy());
303-
return Mono
304-
.from(execute((ClientCallback<Publisher<DeleteByQueryResponse>>) client -> client.deleteByQuery(request)))
305-
.map(responseConverter::byQueryResponse);
306-
}
307-
308339
// region search operations
309340

310341
@Override
311342
protected Flux<SearchDocument> doFind(Query query, Class<?> clazz, IndexCoordinates index) {
312343

313344
return Flux.defer(() -> {
314-
boolean useScroll = !(query.getPageable().isPaged() || query.isLimiting());
315-
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, useScroll);
345+
boolean queryIsUnbounded = !(query.getPageable().isPaged() || query.isLimiting());
316346

317-
if (useScroll) {
318-
return doScroll(searchRequest);
319-
} else {
320-
return doFind(searchRequest);
321-
}
347+
return queryIsUnbounded ? doFindUnbounded(query, clazz, index) : doFindBounded(query, clazz, index);
322348
});
323349

324350
}
325351

326-
private Flux<SearchDocument> doScroll(SearchRequest searchRequest) {
352+
private Flux<SearchDocument> doFindUnbounded(Query query, Class<?> clazz, IndexCoordinates index) {
353+
354+
if (query instanceof BaseQuery baseQuery) {
355+
var pitKeepAlive = Duration.ofMinutes(5);
356+
// setup functions for Flux.usingWhen()
357+
Mono<PitSearchAfter> resourceSupplier = openPointInTime(index, pitKeepAlive, true)
358+
.map(pit -> new PitSearchAfter(baseQuery, pit));
359+
360+
Function<PitSearchAfter, Publisher<?>> asyncComplete = this::cleanupPit;
361+
362+
BiFunction<PitSearchAfter, Throwable, Publisher<?>> asyncError = (psa, ex) -> {
363+
if (LOGGER.isErrorEnabled()) {
364+
LOGGER.error(String.format("Error during pit/search_after"), ex);
365+
}
366+
return cleanupPit(psa);
367+
};
368+
369+
Function<PitSearchAfter, Publisher<?>> asyncCancel = psa -> {
370+
if (LOGGER.isWarnEnabled()) {
371+
LOGGER.warn(String.format("pit/search_after was cancelled"));
372+
}
373+
return cleanupPit(psa);
374+
};
327375

328-
Time scrollTimeout = searchRequest.scroll() != null ? searchRequest.scroll() : Time.of(t -> t.time("1m"));
376+
Function<PitSearchAfter, Publisher<? extends ResponseBody<EntityAsMap>>> resourceClosure = psa -> {
329377

330-
Flux<ResponseBody<EntityAsMap>> searchResponses = Flux.usingWhen(Mono.fromSupplier(ScrollState::new), //
331-
state -> Mono
332-
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
333-
EntityAsMap.class))) //
334-
.expand(entityAsMapSearchResponse -> {
378+
baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive));
379+
baseQuery.addSort(Sort.by("_shard_doc"));
380+
SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true);
335381

336-
state.updateScrollId(entityAsMapSearchResponse.scrollId());
382+
return Mono.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client
383+
.search(firstSearchRequest, EntityAsMap.class))).expand(entityAsMapSearchResponse -> {
337384

338-
if (entityAsMapSearchResponse.hits() == null
339-
|| CollectionUtils.isEmpty(entityAsMapSearchResponse.hits().hits())) {
385+
var hits = entityAsMapSearchResponse.hits().hits();
386+
if (CollectionUtils.isEmpty(hits)) {
340387
return Mono.empty();
341388
}
342389

343-
return Mono.from(execute((ClientCallback<Publisher<ScrollResponse<EntityAsMap>>>) client1 -> {
344-
ScrollRequest scrollRequest = ScrollRequest
345-
.of(sr -> sr.scrollId(state.getScrollId()).scroll(scrollTimeout));
346-
return client1.scroll(scrollRequest, EntityAsMap.class);
347-
}));
348-
}),
349-
this::cleanupScroll, (state, ex) -> cleanupScroll(state), this::cleanupScroll);
390+
List<Object> sortOptions = hits.get(hits.size() - 1).sort().stream().map(TypeUtils::toObject)
391+
.collect(Collectors.toList());
392+
baseQuery.setSearchAfter(sortOptions);
393+
SearchRequest followSearchRequest = requestConverter.searchRequest(baseQuery, clazz, index, false, true);
394+
return Mono.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client
395+
.search(followSearchRequest, EntityAsMap.class)));
396+
});
350397

351-
return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits())
352-
.map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper));
398+
};
399+
400+
Flux<ResponseBody<EntityAsMap>> searchResponses = Flux.usingWhen(resourceSupplier, resourceClosure, asyncComplete,
401+
asyncError, asyncCancel);
402+
return searchResponses.flatMapIterable(entityAsMapSearchResponse -> entityAsMapSearchResponse.hits().hits())
403+
.map(entityAsMapHit -> DocumentAdapters.from(entityAsMapHit, jsonpMapper));
404+
} else {
405+
return Flux.error(new IllegalArgumentException("Query must be derived from BaseQuery"));
406+
}
353407
}
354408

355-
private Publisher<?> cleanupScroll(ScrollState state) {
409+
private Publisher<?> cleanupPit(PitSearchAfter psa) {
410+
var baseQuery = psa.getBaseQuery();
411+
baseQuery.setPointInTime(null);
412+
baseQuery.setSearchAfter(null);
413+
baseQuery.setSort(psa.getSort());
414+
var pit = psa.getPit();
415+
return StringUtils.hasText(pit) ? closePointInTime(pit) : Mono.empty();
416+
}
417+
418+
static private class PitSearchAfter {
419+
private final BaseQuery baseQuery;
420+
@Nullable private final Sort sort;
421+
private final String pit;
422+
423+
PitSearchAfter(BaseQuery baseQuery, String pit) {
424+
this.baseQuery = baseQuery;
425+
this.sort = baseQuery.getSort();
426+
this.pit = pit;
427+
}
428+
429+
public BaseQuery getBaseQuery() {
430+
return baseQuery;
431+
}
356432

357-
if (state.getScrollIds().isEmpty()) {
358-
return Mono.empty();
433+
@Nullable
434+
public Sort getSort() {
435+
return sort;
359436
}
360437

361-
return execute((ClientCallback<Publisher<ClearScrollResponse>>) client -> client
362-
.clearScroll(ClearScrollRequest.of(csr -> csr.scrollId(state.getScrollIds()))));
438+
public String getPit() {
439+
return pit;
440+
}
363441
}
364442

365443
@Override
@@ -368,15 +446,17 @@ protected Mono<Long> doCount(Query query, Class<?> entityType, IndexCoordinates
368446
Assert.notNull(query, "query must not be null");
369447
Assert.notNull(index, "index must not be null");
370448

371-
SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true, false);
449+
SearchRequest searchRequest = requestConverter.searchRequest(query, entityType, index, true);
372450

373451
return Mono
374452
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
375453
EntityAsMap.class)))
376454
.map(searchResponse -> searchResponse.hits().total() != null ? searchResponse.hits().total().value() : 0L);
377455
}
378456

379-
private Flux<SearchDocument> doFind(SearchRequest searchRequest) {
457+
private Flux<SearchDocument> doFindBounded(Query query, Class<?> clazz, IndexCoordinates index) {
458+
459+
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
380460

381461
return Mono
382462
.from(execute((ClientCallback<Publisher<ResponseBody<EntityAsMap>>>) client -> client.search(searchRequest,
@@ -391,7 +471,7 @@ protected <T> Mono<SearchDocumentResponse> doFindForResponse(Query query, Class<
391471
Assert.notNull(query, "query must not be null");
392472
Assert.notNull(index, "index must not be null");
393473

394-
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false, false);
474+
SearchRequest searchRequest = requestConverter.searchRequest(query, clazz, index, false);
395475

396476
// noinspection unchecked
397477
SearchDocumentCallback<T> callback = new ReadSearchDocumentCallback<>((Class<T>) clazz, index);
@@ -458,29 +538,6 @@ public Mono<String> getClusterVersion() {
458538
})).map(infoResponse -> infoResponse.version().number());
459539
}
460540

461-
@Override
462-
public Mono<UpdateResponse> update(UpdateQuery updateQuery, IndexCoordinates index) {
463-
464-
Assert.notNull(updateQuery, "UpdateQuery must not be null");
465-
Assert.notNull(index, "Index must not be null");
466-
467-
UpdateRequest<Document, ?> request = requestConverter.documentUpdateRequest(updateQuery, index, getRefreshPolicy(),
468-
routingResolver.getRouting());
469-
470-
return Mono.from(execute(
471-
(ClientCallback<Publisher<co.elastic.clients.elasticsearch.core.UpdateResponse<Document>>>) client -> client
472-
.update(request, Document.class)))
473-
.flatMap(response -> {
474-
UpdateResponse.Result result = result(response.result());
475-
return result == null ? Mono.empty() : Mono.just(UpdateResponse.of(result));
476-
});
477-
}
478-
479-
@Override
480-
public Mono<ByQueryResponse> updateByQuery(UpdateQuery updateQuery, IndexCoordinates index) {
481-
throw new UnsupportedOperationException("not implemented");
482-
}
483-
484541
@Override
485542
@Deprecated
486543
public <T> Publisher<T> execute(ReactiveElasticsearchOperations.ClientCallback<Publisher<T>> callback) {

0 commit comments

Comments
 (0)